bouncer: session code reworking.

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle
2024-02-17 15:33:15 +07:00
parent 27e27ffc2b
commit 1a5717a42e

View File

@@ -7,8 +7,6 @@ const nip42 = require("./nip42.js");
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config"); let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
const csess = new Map();
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays; log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i); approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
@@ -21,9 +19,7 @@ module.exports = (ws, req, onClose) => {
let authKey = null; let authKey = null;
let authorized = true; let authorized = true;
let lastEvent = Date.now(); let lastEvent = Date.now();
let ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address; ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
ws.id = (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size);
ws.relays = new Set(); // Set() of connected relays. ws.relays = new Set(); // Set() of connected relays.
ws.subs = new Map(); // contains filter submitted by clients. per subID ws.subs = new Map(); // contains filter submitted by clients. per subID
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
@@ -50,7 +46,7 @@ module.exports = (ws, req, onClose) => {
ws.send(JSON.stringify(["AUTH", authKey])); ws.send(JSON.stringify(["AUTH", authKey]));
} }
console.log(process.pid, `->- ${ip} (${ws.id}) connected [${req.headers["user-agent"] || ""}]`); console.log(process.pid, `->- ${ws.ip} connected [${req.headers["user-agent"] || ""}]`);
ws.on("message", data => { ws.on("message", data => {
try { try {
data = JSON.parse(data); data = JSON.parse(data);
@@ -79,7 +75,7 @@ module.exports = (ws, req, onClose) => {
lastEvent = Date.now(); lastEvent = Date.now();
ws.my_events.add(data[1]); ws.my_events.add(data[1]);
bc(data, ws.id); bc(data, ws);
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""])); ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
break; break;
case "REQ": { case "REQ": {
@@ -98,7 +94,7 @@ module.exports = (ws, req, onClose) => {
ws.fakesubalias.set(origID, faked); ws.fakesubalias.set(origID, faked);
data[1] = faked; data[1] = faked;
bc(data, ws.id); bc(data, ws);
if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", origID])); if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", origID]));
ws.pendingEOSE.set(origID, 0); ws.pendingEOSE.set(origID, 0);
break; break;
@@ -117,16 +113,15 @@ module.exports = (ws, req, onClose) => {
ws.subalias.delete(faked); ws.subalias.delete(faked);
data[1] = faked; data[1] = faked;
bc(data, ws.id); bc(data, ws);
ws.send(JSON.stringify(["CLOSED", origID, ""])); ws.send(JSON.stringify(["CLOSED", origID, ""]));
break; break;
case "AUTH": case "AUTH":
if (auth(authKey, data[1], ws, req)) { if (auth(authKey, data[1], ws, req)) {
ws.pubkey = data[1].pubkey; ws.pubkey = data[1].pubkey;
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
if (authorized) return; if (authorized) return;
csess.set(ws.id, ws); newsess(ws);
newsess(ws.id);
authorized = true; authorized = true;
lastEvent = Date.now(); lastEvent = Date.now();
} }
@@ -141,30 +136,29 @@ module.exports = (ws, req, onClose) => {
ws.on('close', _ => { ws.on('close', _ => {
onClose(); onClose();
console.log(process.pid, "---", `${ip} (${ws.id}) disconnected`); console.log(process.pid, "---", `${ws.ip} disconnected`);
for (const i of ws.reconnectTimeout) { for (const i of ws.reconnectTimeout) {
clearTimeout(i); clearTimeout(i);
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i); // Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
} }
terminate_sess(ws.id); for (const sock of ws.relays) {
sock.close();
}
}); });
if (authorized) { if (authorized) newsess(ws);
csess.set(ws.id, ws);
newsess(ws.id);
}
} }
// WS - New session for client $id // WS - New session for client $id
function newsess(id) { function newsess(ws) {
relays.forEach(_ => newConn(_, id)); relays.forEach(_ => newConn(_, ws));
} }
// WS - Broadcast message to every existing sockets // WS - Broadcast message to every existing sockets
function bc(msg, id) { function bc(msg, ws) {
for (const sock of csess.get(id).relays) { for (const sock of ws.relays) {
if (sock.readyState !== 1) continue; if (sock.readyState !== 1) continue;
// skip the ratelimit after <config.upstream_ratelimit_expiration> // skip the ratelimit after <config.upstream_ratelimit_expiration>
@@ -173,18 +167,9 @@ function bc(msg, id) {
} }
} }
// WS - Terminate all existing sockets that were for <id>
function terminate_sess(id) {
for (const sock of csess.get(id).relays) {
sock.close();
}
csess.delete(id);
}
// WS - Sessions // WS - Sessions
function newConn(addr, id, reconn_t = 0) { function newConn(addr, client, reconn_t = 0) {
if (!csess.has(id)) return; if (!client) return;
const relay = new WebSocket(addr, { const relay = new WebSocket(addr, {
headers: { headers: {
"User-Agent": `Bostr (v${version}); The nostr relay bouncer; https://github.com/Yonle/bostr` "User-Agent": `Bostr (v${version}); The nostr relay bouncer; https://github.com/Yonle/bostr`
@@ -193,15 +178,12 @@ function newConn(addr, id, reconn_t = 0) {
allowSynchronousEvents: true allowSynchronousEvents: true
}); });
relay.id = id;
relay.ratelimit = 0; relay.ratelimit = 0;
relay.on('open', _ => { relay.on('open', _ => {
const client = csess.get(id); if (!client) return relay.close();
if (!csess.has(id)) return relay.terminate();
reconn_t = 0; reconn_t = 0;
if (log_about_relays) console.log(process.pid, "---", `[${id}] ${relay.url} is connected`); if (log_about_relays) console.log(process.pid, "---", client.ip, `${relay.url} is connected`);
if (!client) return;
for (const i of client.my_events) { for (const i of client.my_events) {
relay.send(JSON.stringify(["EVENT", i])); relay.send(JSON.stringify(["EVENT", i]));
} }
@@ -212,8 +194,7 @@ function newConn(addr, id, reconn_t = 0) {
}); });
relay.on('message', data => { relay.on('message', data => {
const client = csess.get(id); if (!client) return relay.close();
if (!client) return;
try { try {
data = JSON.parse(data); data = JSON.parse(data);
} catch (error) { } catch (error) {
@@ -266,7 +247,7 @@ function newConn(addr, id, reconn_t = 0) {
data[1] = client.subalias.get(data[1]); data[1] = client.subalias.get(data[1]);
if (!client.pendingEOSE.has(data[1])) return; if (!client.pendingEOSE.has(data[1])) return;
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1); client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`); if (log_about_relays) console.log(process.pid, "---", client.ip, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`);
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return; if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return;
client.pendingEOSE.delete(data[1]); client.pendingEOSE.delete(data[1]);
@@ -296,24 +277,22 @@ function newConn(addr, id, reconn_t = 0) {
}); });
relay.on('error', _ => { relay.on('error', _ => {
if (log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString()) if (log_about_relays) console.error(process.pid, "-!-", client.ip, relay.url, _.toString())
}); });
relay.on('close', _ => { relay.on('close', _ => {
const client = csess.get(id);
if (!client) return; if (!client) return;
client.relays.delete(relay); // Remove this socket session from <client.relays> list client.relays.delete(relay); // Remove this socket session from <client.relays> list
if (log_about_relays) console.log(process.pid, "-!-", `[${id}]`, "Disconnected from", relay.url); if (log_about_relays) console.log(process.pid, "-!-", client.ip, "Disconnected from", relay.url);
reconn_t += reconnect_time || 5000 reconn_t += reconnect_time || 5000
const reconnectTimeout = setTimeout(_ => { const reconnectTimeout = setTimeout(_ => {
newConn(addr, id, reconn_t); newConn(addr, client, reconn_t);
client?.reconnectTimeout.delete(reconnectTimeout); client?.reconnectTimeout.delete(reconnectTimeout);
}, reconn_t); // As a bouncer server, We need to reconnect. }, reconn_t); // As a bouncer server, We need to reconnect.
client?.reconnectTimeout.add(reconnectTimeout); client?.reconnectTimeout.add(reconnectTimeout);
}); });
relay.on('unexpected-response', (req, res) => { relay.on('unexpected-response', (req, res) => {
const client = csess.get(id);
if (!client) return; if (!client) return;
client.relays.delete(relay); client.relays.delete(relay);
if (res.statusCode >= 500) return relay.emit("close", null); if (res.statusCode >= 500) return relay.emit("close", null);
@@ -321,5 +300,5 @@ function newConn(addr, id, reconn_t = 0) {
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`); console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
}); });
csess.get(id).relays.add(relay); // Add this socket session to <client.relays> client.relays.add(relay); // Add this socket session to <client.relays>
} }