diff --git a/worker_bouncer.js b/worker_bouncer.js index 261f0b4..42c2328 100644 --- a/worker_bouncer.js +++ b/worker_bouncer.js @@ -179,18 +179,24 @@ function newsess() { if (cache_relays) { for (const url of cache_relays) { - newConn(url, id); + if (!csess.hasOwnProperty(id)) break; + const relay = new newConn(url, id); + userRelays[id].add(relay); } } switch (shift) { case undefined: for (const url of relays) { - newConn(url, id); + if (!csess.hasOwnProperty(id)) break; + const relay = new newConn(url, id); + userRelays[id].add(relay); } break; default: - newConn(shift, id); + if (!csess.hasOwnProperty(id)) break; + const relay = new newConn(shift, id); + userRelays[id].add(relay); break; } } @@ -275,187 +281,188 @@ function relay_type(addr) { } } -function newConn(addr, id, reconn_t = 0) { - if (!csess.hasOwnProperty(id)) return; - if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 }; - const relay = new WebSocket(addr, { - headers: { - "User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr; ConnID: ${id}; ${server_meta.canonical_url || "No canonical bouncer URL specified"}; Contact: ${server_meta.contact}`, - } - }); - - relay.isCache = relay_type(addr) === "cache_relay"; - relay.isLoadBalancer = relay_type(addr) === "loadbalancer"; - relay.ratelimit = 0; - relay.pendingNIP42 = new Set(); - relay.on('open', _ => { - if (!csess.hasOwnProperty(id)) return relay.terminate(); - const client = csess[id]; - reconn_t = 0; - if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`); - - if (!client) return; - - for (const i in client.subs) { - relay.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]])); - } - }); - - relay.on('message', data => { - try { - data = JSON.parse(data); - } catch (error) { - return; - } - const client = csess[id]; - if (!client) return; - - switch (data[0]) { - case "EVENT": { - stats._global.raw_rx++; - stats[addr].raw_rx++; - if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; - if (!client.subalias.hasOwnProperty(data[1])) return; - data[1] = client.subalias[data[1]]; - - if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. - if (!relay.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay - const filter = client.mergedFilters[data[1]]; - if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !relay.isCache) return; - - if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return; - - const filters = client.subs[data[1]]; - if (!_matchFilters(filters, data[2])) return; - - const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase()); - if (NotInSearchQuery) return; - - if (!relay.isLoadBalancer) client.events[data[1]].add(data[2]?.id); - parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) }); - - if (max_known_events && client.events[data[1]].size >= max_known_events) - client.events[data[1]].delete(client.events[data[1]].values().next().value); - - stats._global.rx++; - stats[addr].rx++; - - // Now count for REQ limit requested by client. - // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID - - // Skip if EOSE has been omitted - if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return; - const limit = getFilterLimit(filter); - if (limit === Infinity) return; - if (client.events[data[1]].size >= limit) { - // Once reached to , send EOSE to client. - parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) }); - - if (!client.accurateMode && (client.saveMode || pause_on_limit)) { - client.pause_subs.add(data[1]); - } else { - delete client.pendingEOSE[data[1]]; - } - } - break; +class newConn extends WebSocket { + constructor(addr, id, reconn_t = 0) { + if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 }; + super(addr, { + headers: { + "User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr; ConnID: ${id}; ${server_meta.canonical_url || "No canonical bouncer URL specified"}; Contact: ${server_meta.contact}`, } - case "EOSE": - if (!client.subalias.hasOwnProperty(data[1])) return; - data[1] = client.subalias[data[1]]; - if (!client.pendingEOSE.hasOwnProperty(data[1]) && !relay.isLoadBalancer) return; - client.pendingEOSE[data[1]]++; + }); - if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[id].size} connected relays.`); + this.isCache = relay_type(addr) === "cache_relay"; + this.isLoadBalancer = relay_type(addr) === "loadbalancer"; + this.ratelimit = 0; + this.pendingNIP42 = new Set(); + this.on('open', _ => { + if (!csess.hasOwnProperty(id)) return this.terminate(); + const client = csess[id]; + reconn_t = 0; + if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`); - if (!relay.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[id].size)))) return; - if (relay.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet. - delete client.pendingEOSE[data[1]]; + if (!client) return; - if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) { - client.pause_subs.delete(data[1]); - } else { + for (const i in client.subs) { + this.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]])); + } + }); + + this.on('message', data => { + try { + data = JSON.parse(data); + } catch (error) { + return; + } + const client = csess[id]; + if (!client) return; + + switch (data[0]) { + case "EVENT": { + stats._global.raw_rx++; + stats[addr].raw_rx++; + if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; + if (!client.subalias.hasOwnProperty(data[1])) return; + data[1] = client.subalias[data[1]]; + + if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. + if (!this.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay + const filter = client.mergedFilters[data[1]]; + if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !this.isCache) return; + + if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return; + + const filters = client.subs[data[1]]; + if (!_matchFilters(filters, data[2])) return; + + const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase()); + if (NotInSearchQuery) return; + + if (!this.isLoadBalancer) client.events[data[1]].add(data[2]?.id); parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) }); + + if (max_known_events && client.events[data[1]].size >= max_known_events) + client.events[data[1]].delete(client.events[data[1]].values().next().value); + + stats._global.rx++; + stats[addr].rx++; + + // Now count for REQ limit requested by client. + // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID + + // Skip if EOSE has been omitted + if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || this.isLoadBalancer) return; + const limit = getFilterLimit(filter); + if (limit === Infinity) return; + if (client.events[data[1]].size >= limit) { + // Once reached to , send EOSE to client. + parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) }); + + if (!client.accurateMode && (client.saveMode || pause_on_limit)) { + client.pause_subs.add(data[1]); + } else { + delete client.pendingEOSE[data[1]]; + } + } + break; } - break; - case "AUTH": - if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return relay.pendingNIP42.add(data[1]); - nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]); - break; + case "EOSE": + if (!client.subalias.hasOwnProperty(data[1])) return; + data[1] = client.subalias[data[1]]; + if (!client.pendingEOSE.hasOwnProperty(data[1]) && !this.isLoadBalancer) return; + client.pendingEOSE[data[1]]++; - case "NOTICE": - if (typeof(data[1]) !== "string") return; - if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now(); + if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[id].size} connected relays.`); - if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]); + if (!this.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[id].size)))) return; + if (this.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet. + delete client.pendingEOSE[data[1]]; - stats._global.f++ - stats[addr].f++ + if (client.pause_subs.has(data[1]) && !this.isLoadBalancer) { + client.pause_subs.delete(data[1]); + } else { + parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) }); + } + break; + case "AUTH": + if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return this.pendingNIP42.add(data[1]); + nip42(this, client.pubkey, private_keys[client.pubkey], data[1]); + break; - break; + case "NOTICE": + if (typeof(data[1]) !== "string") return; + if (data[1].startsWith("rate-limited")) this.ratelimit = Date.now(); - case "CLOSED": - if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return; - if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now(); + if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]); - if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]); + stats._global.f++ + stats[addr].f++ - if (data[2].length) { - stats._global.f++; - stats[addr].f++; - } - if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++; - break; + break; - case "OK": - if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return; - if (data[3].startsWith("rate-limited")) relay.ratelimit = Date.now(); + case "CLOSED": + if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return; + if (data[2].startsWith("rate-limited")) this.ratelimit = Date.now(); - if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]); + if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]); - switch (data[2]) { - case true: - stats._global.tx++; - stats[addr].tx++; - case false: - stats._global.f++ - stats[addr].f++ - } + if (data[2].length) { + stats._global.f++; + stats[addr].f++; + } + if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++; + break; - break; - } - }); + case "OK": + if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return; + if (data[3].startsWith("rate-limited")) this.ratelimit = Date.now(); - relay.on('error', _ => { - if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString()) - }); + if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]); - relay.on('close', _ => { - if (!userRelays.hasOwnProperty(id)) return; - userRelays[id].delete(relay); - if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`); - reconn_t += reconnect_time || 5000 - setTimeout(_ => { - newConn(addr, id, reconn_t); - }, reconn_t); + switch (data[2]) { + case true: + stats._global.tx++; + stats[addr].tx++; + case false: + stats._global.f++ + stats[addr].f++ + } - stats._global.f++ - stats[addr].f++ - }); + break; + } + }); - relay.on('unexpected-response', (req, res) => { - if (!userRelays.hasOwnProperty(id)) return; - userRelays[id].delete(relay); - if (res.statusCode >= 500) return relay.emit("close", null); - relays.splice(relays.indexOf(addr), 1); - cache_relays.splice(cache_relays.indexOf(addr), 1); - loadbalancer.splice(loadbalancer.indexOf(addr), 1); - console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`); + this.on('error', _ => { + if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString()) + }); - stats._global.f++ - stats[addr].f++ - }); + this.on('close', _ => { + if (!userRelays.hasOwnProperty(id)) return; + userRelays[id].delete(this); + if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`); + reconn_t += reconnect_time || 5000 + setTimeout(_ => { + if (!csess.hasOwnProperty(id)) return; + const relay = new newConn(addr, id, reconn_t); + userRelays[id].add(relay); + }, reconn_t); - userRelays[id].add(relay); + stats._global.f++ + stats[addr].f++ + }); + + this.on('unexpected-response', (req, res) => { + if (!userRelays.hasOwnProperty(id)) return; + userRelays[id].delete(this); + if (res.statusCode >= 500) return this.emit("close", null); + relays.splice(relays.indexOf(addr), 1); + cache_relays.splice(cache_relays.indexOf(addr), 1); + loadbalancer.splice(loadbalancer.indexOf(addr), 1); + console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`); + + stats._global.f++ + stats[addr].f++ + }); + } } for (let i = 1; i <= (idle_sessions || 1); i++) {