diff --git a/bouncer.js b/bouncer.js index 8bcba3b..44e72c4 100644 --- a/bouncer.js +++ b/bouncer.js @@ -3,11 +3,12 @@ const { verifySignature, validateEvent, nip19 } = require("nostr-tools"); const auth = require("./auth.js"); const nip42 = require("./nip42.js"); -let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays } = require("./config"); +let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess } = require("./config"); const socks = new Set(); const csess = new Map(); +log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays; authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); @@ -20,8 +21,9 @@ cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/"); module.exports = (ws, req) => { let authKey = null; let authorized = true; + let orphan = getOrphanSess(); // if available - ws.id = process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size; + ws.id = orphan || (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size); ws.subs = new Map(); // contains filter submitted by clients. per subID ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over until all relays send EOSE. per subID ws.events = new Map(); // only to prevent the retransmit of the same event. per subID @@ -100,7 +102,9 @@ module.exports = (ws, req) => { ws.pubkey = data[1].pubkey; console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); if (authorized) return; - newsess(ws.id); + // if orphan, ignore + csess.set(ws.id, ws); + if (!orphan) newsess(ws.id); authorized = true; } break; @@ -113,23 +117,26 @@ module.exports = (ws, req) => { ws.on('error', console.error); ws.on('close', _ => { console.log(process.pid, "---", "Sock", ws.id, "has disconnected."); - csess.delete(ws.id); + csess.set(ws.id, null); // set as orphan. for (i of ws.EOSETimeout) { clearTimeout(i[1]); } if (!authorized) return; - terminate_subs(ws.id); for (i of ws.reconnectTimeout) { clearTimeout(i); // Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i); } + + onClientDisconnect(); }); - csess.set(ws.id, ws); - if (authorized) newsess(ws.id); + if (authorized) { + csess.set(ws.id, ws); + if (!orphan) newsess(ws.id); + } } // CL - Set up EOSE timeout @@ -162,8 +169,8 @@ function cancel_EOSETimeout(id, subid) { // WS - New session for client $id function newsess(id) { - relays.forEach(_ => newConn(_, id)); cache_relays?.forEach(_ => newConn(_, id)); + relays.forEach(_ => newConn(_, id)); } // WS - Broadcast message to every existing sockets @@ -191,7 +198,7 @@ function bc(msg, id) { } // WS - Terminate all existing sockets that were for -function terminate_subs(id) { +function terminate_sess(id) { for (sock of socks) { if (sock.id !== id) continue; sock.terminate(); @@ -199,6 +206,43 @@ function terminate_subs(id) { } } +function onClientDisconnect() { + const orphanSessNum = howManyOrphanSess(); + const max = max_orphan_sess || 0; + if (orphanSessNum > max) { + if (log_about_relays) console.log(process.pid, `There are ${orphanSessNum} of orphan session. I will clear ${orphanSessNum - max} of orphan sessions.`); + clearOrphanSess(orphanSessNum - max); + } +} + +function getOrphanSess() { + for (sess of csess) { + if (sess[1] !== null) continue; + return sess[0]; + break; + } +} + +function howManyOrphanSess() { + let howMany = 0; + for (sess of csess) { + if (sess[1] !== null) continue; + howMany++ + } + + return howMany; +} + +function clearOrphanSess(l) { + let cn = 0; + for (sess of csess) { + if (cn > l) break; + if (sess[1] !== null) continue; + terminate_sess(sess[0]); + cn++; + } +} + // WS - Sessions function newConn(addr, id) { if (!csess.has(id)) return; @@ -213,8 +257,9 @@ function newConn(addr, id) { relay.on('open', _ => { if (!csess.has(id)) return relay.terminate(); socks.add(relay); // Add this socket session to [socks] - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected"); + if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`); + if (!client) return; // is orphan, do nothing. for (i of client.my_events) { relay.send(JSON.stringify(["EVENT", i])); } @@ -225,6 +270,7 @@ function newConn(addr, id) { }); relay.on('message', data => { + if (!client) return; try { data = JSON.parse(data); } catch (error) { @@ -275,7 +321,7 @@ function newConn(addr, id) { case "EOSE": if (!client.pendingEOSE.has(data[1])) return; client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); + if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); if (!cache_relays?.includes(relay.url)) { if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return; @@ -305,18 +351,18 @@ function newConn(addr, id) { }); relay.on('error', _ => { - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString()) + if (log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString()) }); relay.on('close', _ => { socks.delete(relay) // Remove this socket session from [socks] list - if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); + if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); if (!csess.has(id)) return; const reconnectTimeout = setTimeout(_ => { newConn(addr, id); - client.reconnectTimeout.delete(reconnectTimeout); + client?.reconnectTimeout.delete(reconnectTimeout); }, reconnect_time || 5000); // As a bouncer server, We need to reconnect. - client.reconnectTimeout.add(reconnectTimeout); + client?.reconnectTimeout.add(reconnectTimeout); }); } diff --git a/config.js.example b/config.js.example index 43e5da3..405413e 100644 --- a/config.js.example +++ b/config.js.example @@ -16,6 +16,11 @@ module.exports = { // Time before reconnect to relays in milliseconds. reconnect_time: 5000, + // Maximum amount of orphan sessions. + // Setting to 0 disables orphan session function. + // PRIVACY WARNING: Enabling this may could leak some events that people were not supposed to receive (Eg. PM) + max_orphan_sess: 0, + // Wait for every connected relays send EOSE. // Could improve accuracy on received events. //