mirror of
https://github.com/Yonle/bostr.git
synced 2025-06-05 21:19:11 +02:00
feat: Orphan Session
Add feature to keep and reuse an session to cut some loads off & somewhat improve bouncing performance. Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
parent
08684cfdbd
commit
da562a335d
76
bouncer.js
76
bouncer.js
@ -3,11 +3,12 @@ const { verifySignature, validateEvent, nip19 } = require("nostr-tools");
|
|||||||
const auth = require("./auth.js");
|
const auth = require("./auth.js");
|
||||||
const nip42 = require("./nip42.js");
|
const nip42 = require("./nip42.js");
|
||||||
|
|
||||||
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays } = require("./config");
|
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess } = require("./config");
|
||||||
|
|
||||||
const socks = new Set();
|
const socks = new Set();
|
||||||
const csess = new Map();
|
const csess = new Map();
|
||||||
|
|
||||||
|
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
||||||
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
||||||
approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
||||||
|
|
||||||
@ -20,8 +21,9 @@ cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/");
|
|||||||
module.exports = (ws, req) => {
|
module.exports = (ws, req) => {
|
||||||
let authKey = null;
|
let authKey = null;
|
||||||
let authorized = true;
|
let authorized = true;
|
||||||
|
let orphan = getOrphanSess(); // if available
|
||||||
|
|
||||||
ws.id = process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size;
|
ws.id = orphan || (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size);
|
||||||
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
||||||
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
||||||
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
||||||
@ -100,7 +102,9 @@ module.exports = (ws, req) => {
|
|||||||
ws.pubkey = data[1].pubkey;
|
ws.pubkey = data[1].pubkey;
|
||||||
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
||||||
if (authorized) return;
|
if (authorized) return;
|
||||||
newsess(ws.id);
|
// if orphan, ignore
|
||||||
|
csess.set(ws.id, ws);
|
||||||
|
if (!orphan) newsess(ws.id);
|
||||||
authorized = true;
|
authorized = true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -113,23 +117,26 @@ module.exports = (ws, req) => {
|
|||||||
ws.on('error', console.error);
|
ws.on('error', console.error);
|
||||||
ws.on('close', _ => {
|
ws.on('close', _ => {
|
||||||
console.log(process.pid, "---", "Sock", ws.id, "has disconnected.");
|
console.log(process.pid, "---", "Sock", ws.id, "has disconnected.");
|
||||||
csess.delete(ws.id);
|
csess.set(ws.id, null); // set as orphan.
|
||||||
|
|
||||||
for (i of ws.EOSETimeout) {
|
for (i of ws.EOSETimeout) {
|
||||||
clearTimeout(i[1]);
|
clearTimeout(i[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!authorized) return;
|
if (!authorized) return;
|
||||||
terminate_subs(ws.id);
|
|
||||||
|
|
||||||
for (i of ws.reconnectTimeout) {
|
for (i of ws.reconnectTimeout) {
|
||||||
clearTimeout(i);
|
clearTimeout(i);
|
||||||
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
|
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onClientDisconnect();
|
||||||
});
|
});
|
||||||
|
|
||||||
csess.set(ws.id, ws);
|
if (authorized) {
|
||||||
if (authorized) newsess(ws.id);
|
csess.set(ws.id, ws);
|
||||||
|
if (!orphan) newsess(ws.id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CL - Set up EOSE timeout
|
// CL - Set up EOSE timeout
|
||||||
@ -162,8 +169,8 @@ function cancel_EOSETimeout(id, subid) {
|
|||||||
|
|
||||||
// WS - New session for client $id
|
// WS - New session for client $id
|
||||||
function newsess(id) {
|
function newsess(id) {
|
||||||
relays.forEach(_ => newConn(_, id));
|
|
||||||
cache_relays?.forEach(_ => newConn(_, id));
|
cache_relays?.forEach(_ => newConn(_, id));
|
||||||
|
relays.forEach(_ => newConn(_, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
@ -191,7 +198,7 @@ function bc(msg, id) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WS - Terminate all existing sockets that were for <id>
|
// WS - Terminate all existing sockets that were for <id>
|
||||||
function terminate_subs(id) {
|
function terminate_sess(id) {
|
||||||
for (sock of socks) {
|
for (sock of socks) {
|
||||||
if (sock.id !== id) continue;
|
if (sock.id !== id) continue;
|
||||||
sock.terminate();
|
sock.terminate();
|
||||||
@ -199,6 +206,43 @@ function terminate_subs(id) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function onClientDisconnect() {
|
||||||
|
const orphanSessNum = howManyOrphanSess();
|
||||||
|
const max = max_orphan_sess || 0;
|
||||||
|
if (orphanSessNum > max) {
|
||||||
|
if (log_about_relays) console.log(process.pid, `There are ${orphanSessNum} of orphan session. I will clear ${orphanSessNum - max} of orphan sessions.`);
|
||||||
|
clearOrphanSess(orphanSessNum - max);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getOrphanSess() {
|
||||||
|
for (sess of csess) {
|
||||||
|
if (sess[1] !== null) continue;
|
||||||
|
return sess[0];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function howManyOrphanSess() {
|
||||||
|
let howMany = 0;
|
||||||
|
for (sess of csess) {
|
||||||
|
if (sess[1] !== null) continue;
|
||||||
|
howMany++
|
||||||
|
}
|
||||||
|
|
||||||
|
return howMany;
|
||||||
|
}
|
||||||
|
|
||||||
|
function clearOrphanSess(l) {
|
||||||
|
let cn = 0;
|
||||||
|
for (sess of csess) {
|
||||||
|
if (cn > l) break;
|
||||||
|
if (sess[1] !== null) continue;
|
||||||
|
terminate_sess(sess[0]);
|
||||||
|
cn++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WS - Sessions
|
// WS - Sessions
|
||||||
function newConn(addr, id) {
|
function newConn(addr, id) {
|
||||||
if (!csess.has(id)) return;
|
if (!csess.has(id)) return;
|
||||||
@ -213,8 +257,9 @@ function newConn(addr, id) {
|
|||||||
relay.on('open', _ => {
|
relay.on('open', _ => {
|
||||||
if (!csess.has(id)) return relay.terminate();
|
if (!csess.has(id)) return relay.terminate();
|
||||||
socks.add(relay); // Add this socket session to [socks]
|
socks.add(relay); // Add this socket session to [socks]
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected");
|
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
|
||||||
|
|
||||||
|
if (!client) return; // is orphan, do nothing.
|
||||||
for (i of client.my_events) {
|
for (i of client.my_events) {
|
||||||
relay.send(JSON.stringify(["EVENT", i]));
|
relay.send(JSON.stringify(["EVENT", i]));
|
||||||
}
|
}
|
||||||
@ -225,6 +270,7 @@ function newConn(addr, id) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
relay.on('message', data => {
|
relay.on('message', data => {
|
||||||
|
if (!client) return;
|
||||||
try {
|
try {
|
||||||
data = JSON.parse(data);
|
data = JSON.parse(data);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@ -275,7 +321,7 @@ function newConn(addr, id) {
|
|||||||
case "EOSE":
|
case "EOSE":
|
||||||
if (!client.pendingEOSE.has(data[1])) return;
|
if (!client.pendingEOSE.has(data[1])) return;
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
||||||
|
|
||||||
if (!cache_relays?.includes(relay.url)) {
|
if (!cache_relays?.includes(relay.url)) {
|
||||||
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
|
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
|
||||||
@ -305,18 +351,18 @@ function newConn(addr, id) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
relay.on('error', _ => {
|
relay.on('error', _ => {
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString())
|
if (log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString())
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.on('close', _ => {
|
relay.on('close', _ => {
|
||||||
socks.delete(relay) // Remove this socket session from [socks] list
|
socks.delete(relay) // Remove this socket session from [socks] list
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url);
|
if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url);
|
||||||
|
|
||||||
if (!csess.has(id)) return;
|
if (!csess.has(id)) return;
|
||||||
const reconnectTimeout = setTimeout(_ => {
|
const reconnectTimeout = setTimeout(_ => {
|
||||||
newConn(addr, id);
|
newConn(addr, id);
|
||||||
client.reconnectTimeout.delete(reconnectTimeout);
|
client?.reconnectTimeout.delete(reconnectTimeout);
|
||||||
}, reconnect_time || 5000); // As a bouncer server, We need to reconnect.
|
}, reconnect_time || 5000); // As a bouncer server, We need to reconnect.
|
||||||
client.reconnectTimeout.add(reconnectTimeout);
|
client?.reconnectTimeout.add(reconnectTimeout);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,11 @@ module.exports = {
|
|||||||
// Time before reconnect to relays in milliseconds.
|
// Time before reconnect to relays in milliseconds.
|
||||||
reconnect_time: 5000,
|
reconnect_time: 5000,
|
||||||
|
|
||||||
|
// Maximum amount of orphan sessions.
|
||||||
|
// Setting to 0 disables orphan session function.
|
||||||
|
// PRIVACY WARNING: Enabling this may could leak some events that people were not supposed to receive (Eg. PM)
|
||||||
|
max_orphan_sess: 0,
|
||||||
|
|
||||||
// Wait for every connected relays send EOSE.
|
// Wait for every connected relays send EOSE.
|
||||||
// Could improve accuracy on received events.
|
// Could improve accuracy on received events.
|
||||||
//
|
//
|
||||||
|
Loading…
x
Reference in New Issue
Block a user