diff --git a/bouncer.js b/bouncer.js index 64f9e7d..d4dcaf1 100644 --- a/bouncer.js +++ b/bouncer.js @@ -6,13 +6,16 @@ const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = req const auth = require("./auth.js"); const nip42 = require("./nip42.js"); -let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper } = require(process.env.BOSTR_CONFIG_PATH || "./config"); +let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper, loadbalancer } = require(process.env.BOSTR_CONFIG_PATH || "./config"); log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays; authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); allowed_publishers = allowed_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); blocked_publishers = blocked_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); +loadbalancer = loadbalancer || []; +if (relays.length) loadbalancer.unshift("_me"); + // CL MaxEoseScore: Set as 0 if configured relays is under of the expected number from if (relays.length < max_eose_score) max_eose_score = 0; @@ -234,6 +237,9 @@ function handleConnection(ws, req, onClose) { // mostly for new idle session. function newsess() { const id = Date.now() + "_" + process.pid + "_" + Math.random(); + const shift = loadbalancer.shift(); + loadbalancer.push(shift); + userRelays.set(id, new Set()); csess.set(id, null); idleSess.add(id); @@ -244,8 +250,15 @@ function newsess() { } } - for (const url of relays) { - newConn(url, id); + switch (shift) { + case "_me": + for (const url of relays) { + newConn(url, id); + } + break; + default: + newConn(shift, id); + break; } } @@ -281,6 +294,20 @@ function _matchFilters(filters, event) { } } +function relay_type(addr) { + switch (true) { + case relays.includes(addr): + return "relay"; + break; + case cache_relays.includes(addr): + return "cache_relay"; + break; + case loadbalancer.includes(addr): + return "loadbalancer"; + break; + } +} + // WS - Sessions function newConn(addr, id, reconn_t = 0) { if (!csess.has(id)) return; @@ -293,13 +320,14 @@ function newConn(addr, id, reconn_t = 0) { allowSynchronousEvents: true }); - relay.isCache = cache_relays?.includes(addr); + relay.isCache = relay_type(addr) === "cache_relay"; + relay.isLoadBalancer = relay_type(addr) === "loadbalancer"; relay.ratelimit = 0; relay.on('open', _ => { if (!csess.has(id)) return relay.terminate(); const client = csess.get(id); reconn_t = 0; - if (log_about_relays) console.log(process.pid, "---", id, `${addr} is connected`); + if (log_about_relays) console.log(process.pid, "---", id, "Connected to", addr, `(${relay_type(addr)})`); if (!client) return; for (const i of client.my_events) { @@ -342,7 +370,7 @@ function newConn(addr, id, reconn_t = 0) { const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase()); if (NotInSearchQuery) return; - client.events.get(data[1]).add(data[2]?.id); + if (!relay.isLoadBalancer) client.events.get(data[1]).add(data[2]?.id); client.send(JSON.stringify(data)); stats._global.rx++; @@ -352,7 +380,7 @@ function newConn(addr, id, reconn_t = 0) { // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID // Skip if EOSE has been omitted - if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1])) return; + if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return; const limit = getFilterLimit(filter); if (limit === Infinity) return; if (client.events.get(data[1]).size >= limit) { @@ -370,7 +398,7 @@ function newConn(addr, id, reconn_t = 0) { case "EOSE": if (!client.subalias.has(data[1])) return; data[1] = client.subalias.get(data[1]); - if (!client.pendingEOSE.has(data[1])) return; + if (!client.pendingEOSE.has(data[1]) && !relay.isLoadBalancer) return; client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`); @@ -379,7 +407,7 @@ function newConn(addr, id, reconn_t = 0) { if (relay.isCache && !client.events.get(data[1]).size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet. client.pendingEOSE.delete(data[1]); - if (client.pause_subs.has(data[1])) { + if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) { client.pause_subs.delete(data[1]); } else { client.send(JSON.stringify(data)); @@ -440,7 +468,7 @@ function newConn(addr, id, reconn_t = 0) { relay.on('close', _ => { if (!userRelays.has(id)) return; userRelays.get(id).delete(relay); // Remove this socket session from list - if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", addr); + if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`); reconn_t += reconnect_time || 5000 setTimeout(_ => { newConn(addr, id, reconn_t); diff --git a/config.js.example b/config.js.example index 5359e9f..7e1bb48 100644 --- a/config.js.example +++ b/config.js.example @@ -159,6 +159,8 @@ module.exports = { // "wss://example3.com", // ...and so on ], + // Unless you use this bouncer only for load balancing, + // You could empty as long is not empty. // Cache relays - Store received events to cache relay(s) (Optional). // Could improve the speed of event deliveries. @@ -169,5 +171,16 @@ module.exports = { // - Things may not work properly if you configure more than just a single cache relays. cache_relays: [ // "ws://localhost:3000" + ], + + // Load balancer - Load balance this bouncer (Optional) + // + // You could make this bouncer to connect to other bouncer in order to save this server loads. + // It's suggested that the following bouncers does not have `noscraper` or `authorized_keys` being set. + loadbalancer: [ + // "wss://bostr1.example.com", + // "wss://bostr2.example.com", + // "wss://bostr3.example.com", + // ...and so on ] }