From 0f76628728d7581dc923ea4797d4d1581d2ba391 Mon Sep 17 00:00:00 2001 From: Yonle Date: Sat, 18 Nov 2023 16:49:20 +0700 Subject: [PATCH] remove mode 1, and sqlite3 no longer needed Signed-off-by: Yonle --- README.md | 1 - bouncer/bouncer2.js => bouncer.js | 124 +++++----------- bouncer/bouncer1.js | 235 ------------------------------ config.js.example | 12 +- http.js | 5 +- package.json | 6 +- 6 files changed, 48 insertions(+), 335 deletions(-) rename bouncer/bouncer2.js => bouncer.js (53%) delete mode 100644 bouncer/bouncer1.js diff --git a/README.md b/README.md index c73304f..8119651 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,6 @@ tmux new -d "node index.js" ## Environment Variable - `CLUSTERS` - Run Bostr with specified numbers of clusters. -- `IN_MEMORY` - Store temporary data in memory (RAM) instead of disk. - `LOG_ABOUT_RELAYS` - Whenever to log about relay connections ## Docker diff --git a/bouncer/bouncer2.js b/bouncer.js similarity index 53% rename from bouncer/bouncer2.js rename to bouncer.js index e861d04..d097e3b 100644 --- a/bouncer/bouncer2.js +++ b/bouncer.js @@ -1,27 +1,13 @@ -const SQLite = require("better-sqlite3"); const WebSocket = require("ws"); const { validateEvent, nip19 } = require("nostr-tools"); -const auth = require("../auth.js"); -const nip42 = require("../nip42.js"); +const auth = require("./auth.js"); +const nip42 = require("./nip42.js"); -let { relays, tmp_store, log_about_relays, authorized_keys, private_keys } = require("../config"); +let { relays, tmp_store, log_about_relays, authorized_keys, private_keys } = require("./config"); const socks = new Set(); -const sess = new SQLite((process.env.IN_MEMORY || tmp_store != "disk") ? null : (__dirname + "/../.temporary.db")); const csess = new Map(); -const pendingEOSE = new Map(); // per sessID -const reqLimit = new Map(); // per sessID -const searchQuery = new Map(); // per sessID - -// Handle database.... -sess.unsafeMode(true); - -// Temporary database. -sess.exec("CREATE TABLE IF NOT EXISTS sess (cID TEXT, subID TEXT, filter TEXT);"); -sess.exec("CREATE TABLE IF NOT EXISTS events (cID TEXT, subID TEXT, eID TEXT);"); // To prevent transmitting duplicates -sess.exec("CREATE TABLE IF NOT EXISTS recentEvents (cID TEXT, data TEXT);"); - authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); // CL - User socket @@ -30,6 +16,10 @@ module.exports = (ws, req) => { let authorized = true; ws.id = process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size; + ws.sess = new Map(); // contains filter submitted by clients. per subID + ws.events = new Map(); // only to prevent the retransmit of the same event. per subID + ws.my_events = new Set(); // for event retransmitting. + ws.pendingEOSE = new Map(); // each contain subID if (authorized_keys?.length) { authKey = Date.now() + Math.random().toString(36); @@ -52,31 +42,29 @@ module.exports = (ws, req) => { if (!authorized) return; if (!validateEvent(data[1])) return ws.send(JSON.stringify(["NOTICE", "error: invalid event"])); if (data[1].kind == 22242) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: kind 22242"])); - sess.prepare("INSERT INTO recentEvents VALUES (?, ?);").run(ws.id, JSON.stringify(data)); + ws.my_events.add(data[1]); bc(data, ws.id); ws.send(JSON.stringify(["OK", data[1]?.id, true, ""])); break; case "REQ": if (!authorized) return; if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); + if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "expected subID a string. but got the otherwise."])); if (typeof(data[2]) !== "object") return ws.send(JSON.stringify(["NOTICE", "expected filter to be obj, instead gives the otherwise."])); - // eventname -> 1_eventname + if (ws.sess.has(data[1])) return ws.send(JSON.stringify(["NOTICE", "The same subscription ID is already available."])); + ws.sess.set(data[1], data[2]); + ws.events.set(data[1], new Set()); bc(data, ws.id); - sess.prepare("INSERT INTO sess VALUES (?, ?, ?);").run(ws.id, data[1], JSON.stringify(data[2])); - if (data[2]?.search) searchQuery.set(ws.id + ":" + data[1], data[2]?.search); if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", data[1]])); - pendingEOSE.set(ws.id + ":" + data[1], 0); - reqLimit.set(ws.id + ":" + data[1], data[2]?.limit); + ws.pendingEOSE.set(data[1], 0); break; case "CLOSE": if (!authorized) return; if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); + ws.sess.delete(data[1]); + ws.events.delete(data[1]); + ws.pendingEOSE.delete(data[1]); bc(data, ws.id); - pendingEOSE.delete(ws.id + ":" + data[1]); - reqLimit.delete(ws.id + ":" + data[1]); - searchQuery.delete(ws.id + ":" + data[1]); - sess.prepare("DELETE FROM sess WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); - sess.prepare("DELETE FROM events WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); break; case "AUTH": if (auth(authKey, authorized, authorized_keys, data[1], ws, req)) { @@ -86,7 +74,6 @@ module.exports = (ws, req) => { } break; default: - console.warn(process.pid, "---", "Unknown command:", data.join(" ")); ws.send(JSON.stringify(["NOTICE", "error: unrecognized command."])); break; } @@ -98,9 +85,6 @@ module.exports = (ws, req) => { csess.delete(ws.id); if (!authorized) return; - sess.prepare("DELETE FROM sess WHERE cID = ?;").run(ws.id); - sess.prepare("DELETE FROM events WHERE cID = ?;").run(ws.id); - sess.prepare("DELETE FROM recentEvents WHERE cID = ?;").run(ws.id); terminate_sess(ws.id); }); @@ -124,26 +108,12 @@ function terminate_sess(id) { sock.terminate(); socks.delete(sock); } - - for (sub of pendingEOSE) { - if (!sub[0].startsWith(id)) continue; - pendingEOSE.delete(sub[0]); - } - - for (sub of reqLimit) { - if (!sub[0].startsWith(id)) continue; - reqLimit.delete(sub[0]); - } - - for (sub of searchQuery) { - if (!sub[0].startsWith(id)) continue; - searchQuery.delete(sub[0]); - } } // WS - Sessions function newConn(addr, id) { if (!csess.has(id)) return; + const client = csess.get(id); const relay = new WebSocket(addr, { headers: { "User-Agent": "Bostr; The nostr relay bouncer; https://github.com/Yonle/bostr" @@ -155,14 +125,12 @@ function newConn(addr, id) { socks.add(relay); // Add this socket session to [socks] if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected"); - for (i of sess.prepare("SELECT data FROM recentEvents WHERE cID = ?;").iterate(id)) { - if (relay.readyState >= 2) break; - relay.send(i.data); + for (i of client.my_events) { + relay.send(JSON.stringify(i)); } - for (i of sess.prepare("SELECT subID, filter FROM sess WHERE cID = ?;").iterate(id)) { - if (relay.readyState >= 2) break; - relay.send(JSON.stringify(["REQ", i.subID, JSON.parse(i.filter)])); + for (i of client.sess) { + relay.send(JSON.stringify(["REQ", i[0], i[1]])); } }); @@ -176,62 +144,50 @@ function newConn(addr, id) { switch (data[0]) { case "EVENT": { if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; - const NotInSearchQuery = searchQuery.has(id + ":" + data[1]) && !data[2]?.content?.toLowerCase()?.includes(searchQuery.get(id + ":" + data[1]).toLowerCase()); + if (!client.sess.has(data[1])) return; + const NotInSearchQuery = client.sess.get(data[1]).search && !data[2].content?.toLowerCase().includes(client.sess.get(data[1]).search?.toLowerCase()); if (NotInSearchQuery) return; - if (!sess.prepare("SELECT * FROM sess WHERE cID = ? AND subID = ?;").get(id, data[1])) return; - if (sess.prepare("SELECT * FROM events WHERE cID = ? AND subID = ? AND eID = ?;").get(id, data[1], data[2]?.id)) return; // No need to transmit once it has been transmitted before. + if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. - sess.prepare("INSERT INTO events VALUES (?, ?, ?);").run(id, data[1], data[2]?.id); - csess.get(id)?.send(JSON.stringify(data)); + client.events.get(data[1]).add(data[2]?.id); + client.send(JSON.stringify(data)); // Now count for REQ limit requested by client. // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID // Skip if EOSE has been omitted - const subID = id + ":" + data[1]; - if (!pendingEOSE.has(subID)) return; - - let remainingEvents = reqLimit.get(subID); - - if (remainingEvents) { - remainingEvents--; - reqLimit.set(subID, remainingEvents); - } - - if (remainingEvents < 1) { + if (!client.pendingEOSE.has(data[1]) || !client.sess.get(data[1])?.limit) return; + if (client.events.get(data[1]).size >= client.sess.get(data[1])?.limit) { // Once there are no remaining event, Do the instructed above. - csess.get(id)?.send(JSON.stringify(["EOSE", data[1]])); - pendingEOSE.delete(subID); - reqLimit.delete(subID); + client.send(JSON.stringify(["EOSE", data[1]])); + client.pendingEOSE.delete(data[1]); } break; } case "EOSE": - if (!pendingEOSE.has(id + ":" + data[1])) return; - pendingEOSE.set(id + ":" + data[1], pendingEOSE.get(id + ":" + data[1]) + 1); - if (pendingEOSE.get(id + ":" + data[1]) < Array.from(relays).filter(_ => _.id === id).length) return; - csess.get(id)?.send(JSON.stringify(data)); - pendingEOSE.delete(id + ":" + data[1]); - reqLimit.delete(id + ":" + data[1]); + if (!client.pendingEOSE.has(data[1])) return; + client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); + if (client.pendingEOSE.get(data[1]) < Array.from(relays).filter(_ => _.id === id).length) return; + client.send(JSON.stringify(data)); + client.pendingEOSE.delete(data[1]); break; - case "AUTH": { - if (!private_keys || typeof(data[1]) !== "string") return; - const pubkey = csess.get(id)?.pubkey; - nip42(relay, pubkey, private_keys[pubkey], data[1]); + case "AUTH": + if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return; + nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]); break; - } } }); relay.on('error', _ => { if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString()) }); + relay.on('close', _ => { socks.delete(relay) // Remove this socket session from [socks] list if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); if (!csess.has(id)) return; - setTimeout(_ => newConn(addr, id), 5000); // As a bouncer server, We need to reconnect. + setTimeout(_ => newConn(addr, id), config.reconnect_time || 5000); // As a bouncer server, We need to reconnect. }); } diff --git a/bouncer/bouncer1.js b/bouncer/bouncer1.js deleted file mode 100644 index c950a3c..0000000 --- a/bouncer/bouncer1.js +++ /dev/null @@ -1,235 +0,0 @@ -const SQLite = require("better-sqlite3"); -const WebSocket = require("ws"); -const { validateEvent, nip19 } = require("nostr-tools"); -const auth = require("../auth.js"); -const nip42 = require("../nip42.js"); - -let { relays, tmp_store, log_about_relays, authorized_keys, private_keys } = require("../config"); - -const socks = new Set(); -const sess = new SQLite((process.env.IN_MEMORY || tmp_store != "disk") ? null : (__dirname + "/../.temporary.db")); -const csess = new Map(); - -const pendingEOSE = new Map(); // per sessID -const reqLimit = new Map(); // per sessID -const searchQuery = new Map(); // per sessID - -// Handle database.... -sess.unsafeMode(true); - -// Temporary database. -sess.exec("CREATE TABLE IF NOT EXISTS sess (cID TEXT, subID TEXT, filter TEXT);"); -sess.exec("CREATE TABLE IF NOT EXISTS events (cID TEXT, subID TEXT, eID TEXT);"); // To prevent transmitting duplicates -sess.exec("CREATE TABLE IF NOT EXISTS recentEvents (cID TEXT, data TEXT);"); - -authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); - -// CL - User socket -module.exports = (ws, req) => { - let authKey = null; - let authorized = true; - - ws.id = process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size; - - if (authorized_keys?.length) { - authKey = Date.now() + Math.random().toString(36); - authorized = false; - ws.send(JSON.stringify(["AUTH", authKey])); - } - - console.log(process.pid, `->- ${req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address} connected as ${ws.id}`); - ws.on("message", data => { - try { - data = JSON.parse(data); - } catch { - return ws.send( - JSON.stringify(["NOTICE", "error: bad JSON."]) - ) - } - - switch (data[0]) { - case "EVENT": - if (!authorized) return; - if (!validateEvent(data[1])) return ws.send(JSON.stringify(["NOTICE", "error: invalid event"])); - if (data[1].kind == 22242) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: kind 22242"])); - sess.prepare("INSERT INTO recentEvents VALUES (?, ?);").run(ws.id, JSON.stringify(data)); - bc(data); - ws.send(JSON.stringify(["OK", data[1]?.id, true, ""])); - break; - case "REQ": - if (!authorized) return; - if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); - if (typeof(data[2]) !== "object") return ws.send(JSON.stringify(["NOTICE", "expected filter to be obj, instead gives the otherwise."])); - data[1] = ws.id + ":" + data[1]; - // eventname -> 1_eventname - bc(data); - sess.prepare("INSERT INTO sess VALUES (?, ?, ?);").run(ws.id, data[1], JSON.stringify(data[2])); - if (data[2]?.search) searchQuery.set(data[1], data[2]?.search); - if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", data[1].split(":")[1]])); - pendingEOSE.set(data[1], 0); - reqLimit.set(data[1], data[2]?.limit); - break; - case "CLOSE": - if (!authorized) return; - if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); - data[1] = ws.id + ":" + data[1]; - bc(data); - pendingEOSE.delete(data[1]); - reqLimit.delete(data[1]); - searchQuery.delete(data[1]); - sess.prepare("DELETE FROM sess WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); - sess.prepare("DELETE FROM events WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); - break; - case "AUTH": - if (auth(authKey, authorized, authorized_keys, data[1], ws, req)) { - ws.pubkey = data[1].pubkey; - authorized = true; - } - break; - default: - console.warn(process.pid, "---", "Unknown command:", data.join(" ")); - ws.send(JSON.stringify(["NOTICE", "error: unrecognized command."])); - break; - } - }); - - ws.on('error', console.error); - ws.on('close', _ => { - console.log(process.pid, "---", "Sock", ws.id, "has disconnected."); - csess.delete(ws.id); - - if (!authorized) return; - for (i of sess.prepare("SELECT subID FROM sess WHERE cID = ?").iterate(ws.id)) { - bc(["CLOSE", i.subID]); - pendingEOSE.delete(i.subID); - reqLimit.delete(i.subID); - searchQuery.delete(i.subID); - } - - sess.prepare("DELETE FROM sess WHERE cID = ?;").run(ws.id); - sess.prepare("DELETE FROM events WHERE cID = ?;").run(ws.id); - sess.prepare("DELETE FROM recentEvents WHERE cID = ?;").run(ws.id); - }); - - csess.set(ws.id, ws); -} - -// WS - Broadcast message to every existing sockets -function bc(msg) { - for (sock of socks) { - if (sock.readyState >= 2) return socks.delete(sock); - sock.send(JSON.stringify(msg)); - } -} - -// WS - Sessions -function newConn(addr) { - const relay = new WebSocket(addr, { - headers: { - "User-Agent": "Bostr; The nostr relay bouncer; https://github.com/Yonle/bostr" - } - }); - - relay.on('open', _ => { - socks.add(relay); // Add this socket session to [socks] - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${socks.size}/${relays.length}]`, relay.url, "is connected"); - - for (i of sess.prepare("SELECT data FROM recentEvents;").iterate()) { - if (relay.readyState >= 2) break; - relay.send(i.data); - } - - for (i of sess.prepare("SELECT subID, filter FROM sess").iterate()) { - if (relay.readyState >= 2) break; - relay.send(JSON.stringify(["REQ", i.subID, JSON.parse(i.filter)])); - } - }); - - relay.on('message', data => { - try { - data = JSON.parse(data); - } catch (error) { - return console.error(error); - } - - switch (data[0]) { - case "EVENT": { - const subID = data[1]; - const args = subID.split(":") - /* - args[0] -> Client socket ID (bouncer -> client) - args.slice(1).join(":") -> Actual subscription ID that socket client requested. - */ - const cID = args[0]; - const sID = args.slice(1).join(":"); - const NotInSearchQuery = searchQuery.has(subID) && !data[2]?.content?.toLowerCase()?.includes(searchQuery.get(subID).toLowerCase()); - - if (NotInSearchQuery) return; - if (!sess.prepare("SELECT * FROM sess WHERE cID = ? AND subID = ?;").get(cID, subID)) return relay.send(JSON.stringify(["CLOSE", subID])); - if (sess.prepare("SELECT * FROM events WHERE cID = ? AND subID = ? AND eID = ?;").get(cID, subID, data[2]?.id)) return; // No need to transmit once it has been transmitted before. - - sess.prepare("INSERT INTO events VALUES (?, ?, ?);").run(cID, subID, data[2]?.id); - data[1] = sID; - csess.get(cID)?.send(JSON.stringify(data)); - - // Now count for REQ limit requested by client. - // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID - - // Skip if EOSE has been omitted - if (!pendingEOSE.has(subID)) return; - - let remainingEvents = reqLimit.get(subID); - - if (remainingEvents) { - remainingEvents--; - reqLimit.set(subID, remainingEvents); - } - - if (remainingEvents < 1) { - // Once there are no remaining event, Do the instructed above. - csess.get(cID)?.send(JSON.stringify(["EOSE", sID])); - pendingEOSE.delete(subID); - reqLimit.delete(subID); - } - - break; - } - case "EOSE": { - const subID = data[1]; - if (!pendingEOSE.has(subID)) return; - pendingEOSE.set(subID, pendingEOSE.get(subID) + 1); - if (pendingEOSE.get(subID) < relays.length) return; - const args = subID.split(":") - /* - args[0] -> Client socket ID (bouncer -> client) - args.slice(1).join(":") -> Actual subscription ID that socket client requested. - */ - const cID = args[0]; - const sID = args.slice(1).join(":"); - - csess.get(cID)?.send(JSON.stringify(["EOSE", sID])); - pendingEOSE.delete(subID); - reqLimit.delete(subID); - break; - } - case "AUTH": { - if (!private_keys || typeof(data[1]) !== "string") return; - const pubkey = authorized_keys[0]; - nip42(relay, pubkey, private_keys[pubkey], data[1]); - break; - } - } - }); - - relay.on('error', _ => { - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.error(process.pid, "-!-", relay.url, _.toString()); - }); - relay.on('close', _ => { - socks.delete(relay) // Remove this socket session from [socks] list - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "-!-", `[${socks.size}/${relays.length}]`, "Disconnected from", relay.url); - - setTimeout(_ => newConn(addr), 5000); // As a bouncer server, We need to reconnect. - }); -} - -relays.forEach(newConn); diff --git a/config.js.example b/config.js.example index cb9239c..b430026 100644 --- a/config.js.example +++ b/config.js.example @@ -6,21 +6,15 @@ module.exports = { address: "0.0.0.0", port: "8080", - // Bouncing mode - // 1 -> Fast. Bouncer connects to at startup. Useful for bots - // 2 -> Accurate. Every clients has their own sessions. Useful for normal users (recommended) - mode: 2, - // Clusters. clusters: 4, - // Whenever store temporary data in memory (RAM) or in a disk - // Options: disk, memory - tmp_store: "memory", - // Log about bouncer connection with relays? log_about_relays: false, + // Time before reconnect to relays in miliseconds. + reconnect_time: 5000, + // For personal usage. Leaving this empty allows everyone to use this bostr instance. // NOTE: Requires NIP-42 on client. authorized_keys: [ diff --git a/http.js b/http.js index 21517ca..06d7a55 100644 --- a/http.js +++ b/http.js @@ -1,8 +1,7 @@ const WebSocket = require("ws"); const config = require("./config"); const http = require("http"); - -const handleBouncer = require(`./bouncer/bouncer${config?.mode || 1}.js`); +const bouncer = require(`./bouncer.js`); // For log const curD = _ => (new Date()).toLocaleString("ia"); @@ -40,7 +39,7 @@ server.on('request', (req, res) => { }); server.on('upgrade', (req, sock, head) => { - wss.handleUpgrade(req, sock, head, _ => handleBouncer(_, req)); + wss.handleUpgrade(req, sock, head, _ => bouncer(_, req)); }); const listened = server.listen(process.env.PORT || config.port, config.address || "0.0.0.0", _ => { diff --git a/package.json b/package.json index b377a05..478533a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bostr", - "version": "1.0.0", + "version": "2.0.0", "description": "Nostr Bouncer server", "main": "index.js", "scripts": { @@ -14,7 +14,8 @@ "keywords": [ "nostr", "bouncer", - "websocket" + "websocket", + "proxy" ], "author": "Yonle ", "license": "BSD-3-Clause", @@ -23,7 +24,6 @@ }, "homepage": "https://github.com/Yonle/bostr#readme", "dependencies": { - "better-sqlite3": "^9.1.1", "nostr-tools": "^1.17.0", "ws": "^8.14.2" },