diff --git a/bouncer.js b/bouncer.js index 690cc58..3bcfea6 100644 --- a/bouncer.js +++ b/bouncer.js @@ -106,7 +106,6 @@ module.exports = (ws, req) => { console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); if (authorized) return; csess.set(ws.id, ws); - updateSess(ws.id); if (!orphan) newsess(ws.id); authorized = true; } @@ -122,7 +121,6 @@ module.exports = (ws, req) => { console.log(process.pid, "---", "Sock", ws.id, "has disconnected.", `(${howManyOrphanSess()+1} orphans)`); if (csess.has(ws.id)) { csess.set(ws.id, null); // set as orphan. - updateSess(ws.id); // change relays relay.client object } for (i of ws.EOSETimeout) { @@ -149,7 +147,6 @@ module.exports = (ws, req) => { if (authorized) { csess.set(ws.id, ws); - updateSess(ws.id); if (!orphan) newsess(ws.id); } } @@ -259,14 +256,6 @@ function clearOrphanSess(l) { } } -function updateSess(id) { - for (sock of socks) { - if (sock.id !== id) continue; - if (sock.readyState >= 2) return socks.delete(sock); - sock.client = csess.get(id); - } -} - // WS - Sessions function newConn(addr, id) { if (!csess.has(id)) return; @@ -278,24 +267,25 @@ function newConn(addr, id) { }); relay.id = id; - relay.client = csess.get(id); relay.on('open', _ => { + const client = csess.get(id); if (!csess.has(id)) return relay.terminate(); socks.add(relay); // Add this socket session to [socks] - if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!relay.client ? "(orphan)" : ""}`); + if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`); - if (!relay.client) return; // is orphan, do nothing. - for (i of relay.client.my_events) { + if (!client) return; // is orphan, do nothing. + for (i of client.my_events) { relay.send(JSON.stringify(["EVENT", i])); } - for (i of relay.client.subs) { + for (i of client.subs) { relay.send(JSON.stringify(["REQ", i[0], ...i[1]])); } }); relay.on('message', data => { - if (!relay.client) return; + const client = csess.get(id); + if (!client) return; try { data = JSON.parse(data); } catch (error) { @@ -305,23 +295,23 @@ function newConn(addr, id) { switch (data[0]) { case "EVENT": { if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; - if (!relay.client.subs.has(data[1])) return; + if (!client.subs.has(data[1])) return; timeoutEOSE(id, data[1]); - if (relay.client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return; + if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return; // if filter.since > receivedEvent.created_at, skip // if receivedEvent.created_at > filter.until, skip - const cFilter = relay.client.subs.get(data[1]) + const cFilter = client.subs.get(data[1]) if (cFilter?.since > data[2].created_at) return; if (data[2].created_at > cFilter?.until) return; const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase()); if (NotInSearchQuery) return; - if (relay.client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. + if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. - if (!relay.client.pause_subs.has(data[1])) { - relay.client.events.get(data[1]).add(data[2]?.id); - relay.client.send(JSON.stringify(data)); + if (!client.pause_subs.has(data[1])) { + client.events.get(data[1]).add(data[2]?.id); + client.send(JSON.stringify(data)); } // send into cache relays. @@ -331,46 +321,46 @@ function newConn(addr, id) { // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID // Skip if EOSE has been omitted - if (!relay.client.pendingEOSE.has(data[1]) || !relay.client.subs.get(data[1])?.limit || relay.client.pause_subs.has(data[1])) return; - if (relay.client.events.get(data[1]).size >= relay.client.subs.get(data[1])?.limit) { + if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?[0]?.limit || client.pause_subs.has(data[1])) return; + if (client.events.get(data[1]).size >= client.subs.get(data[1])?[0]?.limit) { // Once reached to , send EOSE to client. - relay.client.send(JSON.stringify(["EOSE", data[1]])); + client.send(JSON.stringify(["EOSE", data[1]])); if (pause_on_limit || cache_relays?.includes(relay.url)) { - relay.client.pause_subs.add(data[1]); + client.pause_subs.add(data[1]); } else { - relay.client.pendingEOSE.delete(data[1]); + client.pendingEOSE.delete(data[1]); } } break; } case "EOSE": - if (!relay.client.pendingEOSE.has(data[1])) return; - relay.client.pendingEOSE.set(data[1], relay.client.pendingEOSE.get(data[1]) + 1); - if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${relay.client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); + if (!client.pendingEOSE.has(data[1])) return; + client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); + if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); if (!cache_relays?.includes(relay.url)) { - if (wait_eose && ((relay.client.pendingEOSE.get(data[1]) < max_eose_score) || (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return; - if (relay.client.pause_subs.has(data[1])) return relay.client.pause_subs.delete(data[1]); + if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return; + if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]); cancel_EOSETimeout(data[1]); } else { - if (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return; + if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return; // get the filter - const filter = relay.client.subs.get(data[1]); - if (relay.client.pause_subs.has(data[1])) { - relay.client.pause_subs.delete(data[1]); - relay.client.pendingEOSE.delete(data[1]); + const filter = client.subs.get(data[1]); + if (client.pause_subs.has(data[1])) { + client.pause_subs.delete(data[1]); + client.pendingEOSE.delete(data[1]); } // now req to the direct connection, with the recent one please. - return direct_bc(["REQ", data[1], filter], id); + return direct_bc(["REQ", data[1], ...filter], id); } - relay.client.send(JSON.stringify(data)); + client.send(JSON.stringify(data)); break; case "AUTH": - if (!private_keys || typeof(data[1]) !== "string" || !relay.client.pubkey) return; - nip42(relay, relay.client.pubkey, private_keys[relay.client.pubkey], data[1]); + if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return; + nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]); break; } }); @@ -380,14 +370,15 @@ function newConn(addr, id) { }); relay.on('close', _ => { + const client = csess.get(id); socks.delete(relay) // Remove this socket session from [socks] list if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); if (!csess.has(id)) return; const reconnectTimeout = setTimeout(_ => { newConn(addr, id); - relay.client?.reconnectTimeout.delete(reconnectTimeout); + client?.reconnectTimeout.delete(reconnectTimeout); }, reconnect_time || 5000); // As a bouncer server, We need to reconnect. - relay.client?.reconnectTimeout.add(reconnectTimeout); + client?.reconnectTimeout.add(reconnectTimeout); }); }