feat: bouncer load balancer

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle 2024-04-28 21:01:58 +07:00
parent ec6beb21aa
commit 07bca8d9cf
2 changed files with 51 additions and 10 deletions

View File

@ -6,13 +6,16 @@ const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = req
const auth = require("./auth.js");
const nip42 = require("./nip42.js");
let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper } = require(process.env.BOSTR_CONFIG_PATH || "./config");
let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper, loadbalancer } = require(process.env.BOSTR_CONFIG_PATH || "./config");
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
allowed_publishers = allowed_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
blocked_publishers = blocked_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
loadbalancer = loadbalancer || [];
if (relays.length) loadbalancer.unshift("_me");
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
if (relays.length < max_eose_score) max_eose_score = 0;
@ -234,6 +237,9 @@ function handleConnection(ws, req, onClose) {
// mostly for new idle session.
function newsess() {
const id = Date.now() + "_" + process.pid + "_" + Math.random();
const shift = loadbalancer.shift();
loadbalancer.push(shift);
userRelays.set(id, new Set());
csess.set(id, null);
idleSess.add(id);
@ -244,8 +250,15 @@ function newsess() {
}
}
for (const url of relays) {
newConn(url, id);
switch (shift) {
case "_me":
for (const url of relays) {
newConn(url, id);
}
break;
default:
newConn(shift, id);
break;
}
}
@ -281,6 +294,20 @@ function _matchFilters(filters, event) {
}
}
function relay_type(addr) {
switch (true) {
case relays.includes(addr):
return "relay";
break;
case cache_relays.includes(addr):
return "cache_relay";
break;
case loadbalancer.includes(addr):
return "loadbalancer";
break;
}
}
// WS - Sessions
function newConn(addr, id, reconn_t = 0) {
if (!csess.has(id)) return;
@ -293,13 +320,14 @@ function newConn(addr, id, reconn_t = 0) {
allowSynchronousEvents: true
});
relay.isCache = cache_relays?.includes(addr);
relay.isCache = relay_type(addr) === "cache_relay";
relay.isLoadBalancer = relay_type(addr) === "loadbalancer";
relay.ratelimit = 0;
relay.on('open', _ => {
if (!csess.has(id)) return relay.terminate();
const client = csess.get(id);
reconn_t = 0;
if (log_about_relays) console.log(process.pid, "---", id, `${addr} is connected`);
if (log_about_relays) console.log(process.pid, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
if (!client) return;
for (const i of client.my_events) {
@ -342,7 +370,7 @@ function newConn(addr, id, reconn_t = 0) {
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
if (NotInSearchQuery) return;
client.events.get(data[1]).add(data[2]?.id);
if (!relay.isLoadBalancer) client.events.get(data[1]).add(data[2]?.id);
client.send(JSON.stringify(data));
stats._global.rx++;
@ -352,7 +380,7 @@ function newConn(addr, id, reconn_t = 0) {
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted
if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1])) return;
if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return;
const limit = getFilterLimit(filter);
if (limit === Infinity) return;
if (client.events.get(data[1]).size >= limit) {
@ -370,7 +398,7 @@ function newConn(addr, id, reconn_t = 0) {
case "EOSE":
if (!client.subalias.has(data[1])) return;
data[1] = client.subalias.get(data[1]);
if (!client.pendingEOSE.has(data[1])) return;
if (!client.pendingEOSE.has(data[1]) && !relay.isLoadBalancer) return;
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
@ -379,7 +407,7 @@ function newConn(addr, id, reconn_t = 0) {
if (relay.isCache && !client.events.get(data[1]).size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
client.pendingEOSE.delete(data[1]);
if (client.pause_subs.has(data[1])) {
if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) {
client.pause_subs.delete(data[1]);
} else {
client.send(JSON.stringify(data));
@ -440,7 +468,7 @@ function newConn(addr, id, reconn_t = 0) {
relay.on('close', _ => {
if (!userRelays.has(id)) return;
userRelays.get(id).delete(relay); // Remove this socket session from <client.relays> list
if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", addr);
if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
reconn_t += reconnect_time || 5000
setTimeout(_ => {
newConn(addr, id, reconn_t);

View File

@ -159,6 +159,8 @@ module.exports = {
// "wss://example3.com",
// ...and so on
],
// Unless you use this bouncer only for load balancing,
// You could empty <relays> as long <loadbalancer> is not empty.
// Cache relays - Store received events to cache relay(s) (Optional).
// Could improve the speed of event deliveries.
@ -169,5 +171,16 @@ module.exports = {
// - Things may not work properly if you configure more than just a single cache relays.
cache_relays: [
// "ws://localhost:3000"
],
// Load balancer - Load balance this bouncer (Optional)
//
// You could make this bouncer to connect to other bouncer in order to save this server loads.
// It's suggested that the following bouncers does not have `noscraper` or `authorized_keys` being set.
loadbalancer: [
// "wss://bostr1.example.com",
// "wss://bostr2.example.com",
// "wss://bostr3.example.com",
// ...and so on
]
}