diff --git a/bouncer.js b/bouncer.js index c2558ec..f657e27 100644 --- a/bouncer.js +++ b/bouncer.js @@ -3,16 +3,20 @@ const { validateEvent, nip19 } = require("nostr-tools"); const auth = require("./auth.js"); const nip42 = require("./nip42.js"); -let { relays, tmp_store, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score } = require("./config"); +let { relays, tmp_store, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays } = require("./config"); const socks = new Set(); const csess = new Map(); +let recentevent = {}; + authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); // CL MaxEoseScore: Set as 0 if configured relays is under of the expected number from if (relays.length < max_eose_score) max_eose_score = 0; +cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/"); + // CL - User socket module.exports = (ws, req) => { let authKey = null; @@ -80,14 +84,17 @@ module.exports = (ws, req) => { ws.pendingEOSE.delete(data[1]); ws.pause_subs.delete(data[1]); cancel_EOSETimeout(ws.id, data[1]); - bc(data, ws.id); + + delete recentevent[ws.id + ":" + data[1]]; + cache_bc(data, ws.id); + direct_bc(data, ws.id); break; case "AUTH": if (auth(authKey, data[1], ws, req)) { ws.pubkey = data[1].pubkey; console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); if (authorized) return; - relays.forEach(_ => newConn(_, ws.id)); + newsess(ws.id); authorized = true; } break; @@ -111,7 +118,7 @@ module.exports = (ws, req) => { }); csess.set(ws.id, ws); - if (authorized) relays.forEach(_ => newConn(_, ws.id)); + if (authorized) newsess(ws.id); } // CL - Set up EOSE timeout @@ -142,15 +149,36 @@ function cancel_EOSETimeout(id, subid) { c.EOSETimeout.delete(subid); } +// WS - New session for client $id +function newsess(id) { + relays.forEach(_ => newConn(_, id)); + cache_relays?.forEach(_ => newConn(_, id)); +} + // WS - Broadcast message to every existing sockets -function bc(msg, id) { +function direct_bc(msg, id) { for (sock of socks) { + if (cache_relays?.includes(sock.url)) continue; if (sock.id !== id) continue; if (sock.readyState >= 2) return socks.delete(sock); sock.send(JSON.stringify(msg)); } } +function cache_bc(msg, id) { + for (sock of socks) { + if (!cache_relays?.includes(sock.url)) continue; + if (sock.id !== id) continue; + if (sock.readyState >= 2) return socks.delete(sock); + sock.send(JSON.stringify(msg)); + } +} + +function bc(msg, id) { + if (!cache_relays?.length) direct_bc(msg, id); + else cache_bc(msg, id); +} + // WS - Terminate all existing sockets that were for function terminate_subs(id) { for (sock of socks) { @@ -175,12 +203,14 @@ function newConn(addr, id) { socks.add(relay); // Add this socket session to [socks] if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected"); - for (i of client.my_events) { - relay.send(JSON.stringify(["EVENT", i])); - } + if (cache_relays?.includes(relay.url)) { + for (i of client.my_events) { + relay.send(JSON.stringify(["EVENT", i])); + } - for (i of client.subs) { - relay.send(JSON.stringify(["REQ", i[0], i[1]])); + for (i of client.subs) { + relay.send(JSON.stringify(["REQ", i[0], i[1]])); + } } }); @@ -196,25 +226,30 @@ function newConn(addr, id) { if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; if (!client.subs.has(data[1])) return; timeoutEOSE(id, data[1]); - if (client.pause_subs.has(data[1])) return; + if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return; // if filter.since > receivedEvent.created_at, skip // if receivedEvent.created_at > filter.until, skip if (client.subs.get(data[1]).since > data[2].created_at) return; if (data[2].created_at > client.subs.get(data[1]).until) return; - const NotInSearchQuery = client.subs.get(data[1]).search && !data[2].content?.toLowerCase().includes(client.subs.get(data[1]).search?.toLowerCase()); + const NotInSearchQuery = client.subs.get(data[1]).search && !data[2].content?.toLowerCase().includess(client.subs.get(data[1]).search?.toLowerCase()); if (NotInSearchQuery) return; if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. client.events.get(data[1]).add(data[2]?.id); - client.send(JSON.stringify(data)); + if (!client.pause_subs.has(data[1])) client.send(JSON.stringify(data)); + + // send into cache relays. + if (!cache_relays?.includes(relay.url)) cache_bc(["EVENT", data[2]], id); + if (data[2]?.created_at > recentevent[id + ":" + data[1]]) + recentevent[id + ":" + data[1]] = data[2]?.created_at // Now count for REQ limit requested by client. // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID // Skip if EOSE has been omitted - if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?.limit) return; + if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?.limit || client.pause_subs.has(data[1])) return; if (client.events.get(data[1]).size >= client.subs.get(data[1])?.limit) { // Once reached to , send EOSE to client. client.send(JSON.stringify(["EOSE", data[1]])); @@ -230,9 +265,21 @@ function newConn(addr, id) { if (!client.pendingEOSE.has(data[1])) return; client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); - if (wait_eose && (client.pendingEOSE.get(data[1]) < (max_eose_score || Array.from(socks).filter(sock => sock.id === id).length))) return; + + if (!cache_relays?.includes(relay.url)) { + if (wait_eose && (client.pendingEOSE.get(data[1]) < (max_eose_score || Array.from(socks).filter(sock => sock.id === id).length))) return; + cancel_EOSETimeout(data[1]); + } else { + if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays.includes(sock.url)).length) return; + // get the filter + const filter = client.subs.get(data[1]); + if (!filter.since && recentevent[id + ":" + data[1]]) filter.since = recentevent[id + ":" + data[1]]; + + // now req to the direct connection, with the recent one please. + return direct_bc(["REQ", data[1], filter], id); + } + client.pendingEOSE.delete(data[1]); - cancel_EOSETimeout(data[1]); if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]); client.send(JSON.stringify(data)); break; diff --git a/config.js.example b/config.js.example index e1ad562..8f56366 100644 --- a/config.js.example +++ b/config.js.example @@ -80,6 +80,17 @@ module.exports = { "version": require("./package.json").version }, + // Cache relays + // Used for caching received events from to reduce bandwidth waste. + // Keeping this empty will disable caching function. + // + // To make this working properly, Please enable . + cache_relays: [ + // "ws://localhost:8001", + // "ws://localhost:8002", + // ...and so on + ], + // Nostr relays to bounce [Required] relays: [ "wss://example1.com",