diff --git a/bouncer.js b/bouncer.js index 66079c2..2d2caa7 100644 --- a/bouncer.js +++ b/bouncer.js @@ -7,8 +7,6 @@ const nip42 = require("./nip42.js"); let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config"); -const csess = new Map(); - log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays; authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); @@ -21,9 +19,7 @@ module.exports = (ws, req, onClose) => { let authKey = null; let authorized = true; let lastEvent = Date.now(); - let ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address; - - ws.id = (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size); + ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address; ws.relays = new Set(); // Set() of connected relays. ws.subs = new Map(); // contains filter submitted by clients. per subID ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over until all relays send EOSE. per subID @@ -50,7 +46,7 @@ module.exports = (ws, req, onClose) => { ws.send(JSON.stringify(["AUTH", authKey])); } - console.log(process.pid, `->- ${ip} (${ws.id}) connected [${req.headers["user-agent"] || ""}]`); + console.log(process.pid, `->- ${ws.ip} connected [${req.headers["user-agent"] || ""}]`); ws.on("message", data => { try { data = JSON.parse(data); @@ -79,7 +75,7 @@ module.exports = (ws, req, onClose) => { lastEvent = Date.now(); ws.my_events.add(data[1]); - bc(data, ws.id); + bc(data, ws); ws.send(JSON.stringify(["OK", data[1]?.id, true, ""])); break; case "REQ": { @@ -98,7 +94,7 @@ module.exports = (ws, req, onClose) => { ws.fakesubalias.set(origID, faked); data[1] = faked; - bc(data, ws.id); + bc(data, ws); if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", origID])); ws.pendingEOSE.set(origID, 0); break; @@ -117,16 +113,15 @@ module.exports = (ws, req, onClose) => { ws.subalias.delete(faked); data[1] = faked; - bc(data, ws.id); + bc(data, ws); ws.send(JSON.stringify(["CLOSED", origID, ""])); break; case "AUTH": if (auth(authKey, data[1], ws, req)) { ws.pubkey = data[1].pubkey; - console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); + console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); if (authorized) return; - csess.set(ws.id, ws); - newsess(ws.id); + newsess(ws); authorized = true; lastEvent = Date.now(); } @@ -141,30 +136,29 @@ module.exports = (ws, req, onClose) => { ws.on('close', _ => { onClose(); - console.log(process.pid, "---", `${ip} (${ws.id}) disconnected`); + console.log(process.pid, "---", `${ws.ip} disconnected`); for (const i of ws.reconnectTimeout) { clearTimeout(i); // Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i); } - terminate_sess(ws.id); + for (const sock of ws.relays) { + sock.close(); + } }); - if (authorized) { - csess.set(ws.id, ws); - newsess(ws.id); - } + if (authorized) newsess(ws); } // WS - New session for client $id -function newsess(id) { - relays.forEach(_ => newConn(_, id)); +function newsess(ws) { + relays.forEach(_ => newConn(_, ws)); } // WS - Broadcast message to every existing sockets -function bc(msg, id) { - for (const sock of csess.get(id).relays) { +function bc(msg, ws) { + for (const sock of ws.relays) { if (sock.readyState !== 1) continue; // skip the ratelimit after @@ -173,18 +167,9 @@ function bc(msg, id) { } } -// WS - Terminate all existing sockets that were for -function terminate_sess(id) { - for (const sock of csess.get(id).relays) { - sock.close(); - } - - csess.delete(id); -} - // WS - Sessions -function newConn(addr, id, reconn_t = 0) { - if (!csess.has(id)) return; +function newConn(addr, client, reconn_t = 0) { + if (!client) return; const relay = new WebSocket(addr, { headers: { "User-Agent": `Bostr (v${version}); The nostr relay bouncer; https://github.com/Yonle/bostr` @@ -193,15 +178,12 @@ function newConn(addr, id, reconn_t = 0) { allowSynchronousEvents: true }); - relay.id = id; relay.ratelimit = 0; relay.on('open', _ => { - const client = csess.get(id); - if (!csess.has(id)) return relay.terminate(); + if (!client) return relay.close(); reconn_t = 0; - if (log_about_relays) console.log(process.pid, "---", `[${id}] ${relay.url} is connected`); + if (log_about_relays) console.log(process.pid, "---", client.ip, `${relay.url} is connected`); - if (!client) return; for (const i of client.my_events) { relay.send(JSON.stringify(["EVENT", i])); } @@ -212,8 +194,7 @@ function newConn(addr, id, reconn_t = 0) { }); relay.on('message', data => { - const client = csess.get(id); - if (!client) return; + if (!client) return relay.close(); try { data = JSON.parse(data); } catch (error) { @@ -266,7 +247,7 @@ function newConn(addr, id, reconn_t = 0) { data[1] = client.subalias.get(data[1]); if (!client.pendingEOSE.has(data[1])) return; client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); - if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`); + if (log_about_relays) console.log(process.pid, "---", client.ip, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`); if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return; client.pendingEOSE.delete(data[1]); @@ -296,24 +277,22 @@ function newConn(addr, id, reconn_t = 0) { }); relay.on('error', _ => { - if (log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString()) + if (log_about_relays) console.error(process.pid, "-!-", client.ip, relay.url, _.toString()) }); relay.on('close', _ => { - const client = csess.get(id); if (!client) return; client.relays.delete(relay); // Remove this socket session from list - if (log_about_relays) console.log(process.pid, "-!-", `[${id}]`, "Disconnected from", relay.url); + if (log_about_relays) console.log(process.pid, "-!-", client.ip, "Disconnected from", relay.url); reconn_t += reconnect_time || 5000 const reconnectTimeout = setTimeout(_ => { - newConn(addr, id, reconn_t); + newConn(addr, client, reconn_t); client?.reconnectTimeout.delete(reconnectTimeout); }, reconn_t); // As a bouncer server, We need to reconnect. client?.reconnectTimeout.add(reconnectTimeout); }); relay.on('unexpected-response', (req, res) => { - const client = csess.get(id); if (!client) return; client.relays.delete(relay); if (res.statusCode >= 500) return relay.emit("close", null); @@ -321,5 +300,5 @@ function newConn(addr, id, reconn_t = 0) { console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`); }); - csess.get(id).relays.add(relay); // Add this socket session to + client.relays.add(relay); // Add this socket session to }