diff --git a/bouncer.js b/bouncer.js index d19b211..08d9e8f 100644 --- a/bouncer.js +++ b/bouncer.js @@ -16,6 +16,10 @@ blocked_publishers = blocked_publishers?.map(i => i.startsWith("npub") ? nip19.d // CL MaxEoseScore: Set as 0 if configured relays is under of the expected number from if (relays.length < max_eose_score) max_eose_score = 0; +const csess = new Map(); // this is used for relays. +const userRelays = new Map(); // per ID contains Set() of +const orphanSess = new Set(); + // CL - User socket module.exports = (ws, req, onClose) => { let query = querystring.parse(req.url.slice(2)); @@ -24,7 +28,6 @@ module.exports = (ws, req, onClose) => { let sessStarted = false; let lastEvent = Date.now(); ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address; - ws.relays = new Set(); // Set() of connected relays. ws.subs = new Map(); // contains filter submitted by clients. per subID ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over until all relays send EOSE. per subID ws.events = new Map(); // only to prevent the retransmit of the same event. per subID @@ -86,13 +89,13 @@ module.exports = (ws, req, onClose) => { lastEvent = Date.now(); ws.my_events.add(data[1]); - if (!ws.relays.size && !sessStarted) { + if (!sessStarted) { console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`); - newsess(ws); + getOrphanSess(ws); sessStarted = true; } - bc(data, ws); + bc(data, ws.id); ws.send(JSON.stringify(["OK", data[1]?.id, true, ""])); break; case "REQ": { @@ -123,9 +126,9 @@ module.exports = (ws, req, onClose) => { filters[fn].limit = ws.forcedLimit; } - if (!ws.relays.size && !sessStarted) { + if (!sessStarted) { console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`); - newsess(ws); + getOrphanSess(ws); sessStarted = true; } @@ -137,7 +140,7 @@ module.exports = (ws, req, onClose) => { if (!filter.since) filter.since = Math.floor(Date.now() / 1000); // Will not impact everything. Only used for handling passing pause_on_limit (or save mode) ws.mergedFilters.set(origID, filter); data[1] = faked; - bc(data, ws); + bc(data, ws.id); if (filter.limit < 1) return ws.send(JSON.stringify(["EOSE", origID])); ws.pendingEOSE.set(origID, 0); break; @@ -157,7 +160,7 @@ module.exports = (ws, req, onClose) => { ws.mergedFilters.delete(origID); data[1] = faked; - bc(data, ws); + bc(data, ws.id); ws.send(JSON.stringify(["CLOSED", origID, ""])); break; case "AUTH": @@ -181,25 +184,28 @@ module.exports = (ws, req, onClose) => { console.log(process.pid, "---", `${ws.ip} disconnected`); - for (const i of ws.reconnectTimeout) { - clearTimeout(i); - // Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i); - } - - for (const sock of ws.relays) { + for (const sock of userRelays.get(ws.id)) { sock.terminate(); } + + userRelays.delete(ws.id); + csess.delete(ws.id); }); } // WS - New session for client $id -function newsess(ws) { - relays.forEach(_ => newConn(_, ws)); +// mostly for new idle session. +function newsess() { + const id = Date.now() + "_" + process.pid + "_" + Math.random(); + userRelays.set(id, new Set()); + csess.set(id, null); + orphanSess.add(id); + relays.forEach(_ => newConn(_, id)); } // WS - Broadcast message to every existing sockets -function bc(msg, ws) { - for (const sock of ws.relays) { +function bc(msg, id) { + for (const sock of userRelays.get(id)) { if (sock.readyState !== 1) continue; // skip the ratelimit after @@ -208,17 +214,20 @@ function bc(msg, ws) { } } -// WS - Sessions -function newConn(addr, client, reconn_t = 0) { - if (client.readyState !== 1) return; - let additionalReqHeaders = {}; - if (forward_ip_address_to_upstream) - additionalReqHeaders["x-forwarded-for"] = client.ip; +function getOrphanSess(ws) { + ws.id = orphanSess.values().next().value; + orphanSess.delete(ws.id); + csess.set(ws.id, ws); + newsess(); +} + +// WS - Sessions +function newConn(addr, id, reconn_t = 0) { + if (!csess.has(id)) return; const relay = new WebSocket(addr, { headers: { "User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr`, - ...additionalReqHeaders }, noDelay: true, allowSynchronousEvents: true @@ -226,10 +235,12 @@ function newConn(addr, client, reconn_t = 0) { relay.ratelimit = 0; relay.on('open', _ => { - if (client.readyState !== 1) return relay.terminate(); + if (!csess.has(id)) return relay.terminate(); + const client = csess.get(id); reconn_t = 0; - if (log_about_relays) console.log(process.pid, "---", client.ip, `${relay.url} is connected`); + if (log_about_relays) console.log(process.pid, "---", id, `${relay.url} is connected`); + if (!client) return; for (const i of client.my_events) { relay.send(JSON.stringify(["EVENT", i])); } @@ -240,12 +251,14 @@ function newConn(addr, client, reconn_t = 0) { }); relay.on('message', data => { - if (client.readyState !== 1) return relay.terminate(); + if (!csess.has(id)) return relay.terminate(); try { data = JSON.parse(data); } catch (error) { return; } + const client = csess.get(id); + if (!client) return; switch (data[0]) { case "EVENT": { @@ -290,9 +303,9 @@ function newConn(addr, client, reconn_t = 0) { data[1] = client.subalias.get(data[1]); if (!client.pendingEOSE.has(data[1])) return; client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); - if (log_about_relays) console.log(process.pid, "---", client.ip, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`); + if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`); - if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return; + if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size))) return; client.pendingEOSE.delete(data[1]); if (client.pause_subs.has(data[1])) { @@ -322,28 +335,28 @@ function newConn(addr, client, reconn_t = 0) { }); relay.on('error', _ => { - if (log_about_relays) console.error(process.pid, "-!-", client.ip, relay.url, _.toString()) + if (log_about_relays) console.error(process.pid, "-!-", id, relay.url, _.toString()) }); relay.on('close', _ => { - if (client.readyState !== 1) return; - client.relays.delete(relay); // Remove this socket session from list - if (log_about_relays) console.log(process.pid, "-!-", client.ip, "Disconnected from", relay.url); + if (!userRelays.has(id)) return; + userRelays.get(id).delete(relay); // Remove this socket session from list + if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", relay.url); reconn_t += reconnect_time || 5000 - const reconnectTimeout = setTimeout(_ => { - newConn(addr, client, reconn_t); - client?.reconnectTimeout.delete(reconnectTimeout); - }, reconn_t); // As a bouncer server, We need to reconnect. - client?.reconnectTimeout.add(reconnectTimeout); + setTimeout(_ => { + newConn(addr, id, reconn_t); + }, reconn_t); }); relay.on('unexpected-response', (req, res) => { - if (client.readyState !== 1) return; - client.relays.delete(relay); + if (!userRelays.has(id)) return; + userRelays.get(id).delete(relay); if (res.statusCode >= 500) return relay.emit("close", null); relays = relays.filter(_ => !relay.url.startsWith(_)); console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`); }); - client.relays.add(relay); // Add this socket session to + userRelays.get(id).add(relay); // Add this socket session to } + +newsess(); diff --git a/package.json b/package.json index c7e5c19..6326908 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bostr", - "version": "2.0.10", + "version": "2.0.11-dev", "description": "Nostr relay bouncer", "main": "index.js", "scripts": {