mirror of
https://github.com/Yonle/bostr.git
synced 2025-10-10 16:42:34 +02:00
bouncer: Relays handling rework
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
97
bouncer.js
97
bouncer.js
@@ -7,7 +7,6 @@ const nip42 = require("./nip42.js");
|
|||||||
|
|
||||||
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
|
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
|
||||||
|
|
||||||
const socks = new Set();
|
|
||||||
const csess = new Map();
|
const csess = new Map();
|
||||||
|
|
||||||
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
||||||
@@ -23,11 +22,11 @@ cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/");
|
|||||||
module.exports = (ws, req, onClose) => {
|
module.exports = (ws, req, onClose) => {
|
||||||
let authKey = null;
|
let authKey = null;
|
||||||
let authorized = true;
|
let authorized = true;
|
||||||
let orphan = getOrphanSess(); // if available
|
|
||||||
let lastEvent = Date.now();
|
let lastEvent = Date.now();
|
||||||
let ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
|
let ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
|
||||||
|
|
||||||
ws.id = orphan || (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size);
|
ws.id = (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size);
|
||||||
|
ws.relays = new Set(); // Set() of connected relays.
|
||||||
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
||||||
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
||||||
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
||||||
@@ -54,7 +53,7 @@ module.exports = (ws, req, onClose) => {
|
|||||||
ws.send(JSON.stringify(["AUTH", authKey]));
|
ws.send(JSON.stringify(["AUTH", authKey]));
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(process.pid, `->- ${ip} (${ws.id}) connected ${orphan ? "(orphan reused) " : ""}[${req.headers["user-agent"] || ""}]`);
|
console.log(process.pid, `->- ${ip} (${ws.id}) connected [${req.headers["user-agent"] || ""}]`);
|
||||||
ws.on("message", data => {
|
ws.on("message", data => {
|
||||||
try {
|
try {
|
||||||
data = JSON.parse(data);
|
data = JSON.parse(data);
|
||||||
@@ -133,7 +132,7 @@ module.exports = (ws, req, onClose) => {
|
|||||||
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
||||||
if (authorized) return;
|
if (authorized) return;
|
||||||
csess.set(ws.id, ws);
|
csess.set(ws.id, ws);
|
||||||
if (!orphan) newsess(ws.id);
|
newsess(ws.id);
|
||||||
authorized = true;
|
authorized = true;
|
||||||
lastEvent = Date.now();
|
lastEvent = Date.now();
|
||||||
}
|
}
|
||||||
@@ -148,36 +147,23 @@ module.exports = (ws, req, onClose) => {
|
|||||||
ws.on('close', _ => {
|
ws.on('close', _ => {
|
||||||
onClose();
|
onClose();
|
||||||
|
|
||||||
console.log(process.pid, "---", `${ip} (${ws.id}) disconnected (${howManyOrphanSess()+1} orphans)`);
|
console.log(process.pid, "---", `${ip} (${ws.id}) disconnected`);
|
||||||
if (csess.has(ws.id))
|
|
||||||
csess.set(ws.id, null); // set as orphan.
|
|
||||||
|
|
||||||
for (const i of ws.EOSETimeout) {
|
for (const i of ws.EOSETimeout) {
|
||||||
clearTimeout(i[1]);
|
clearTimeout(i[1]);
|
||||||
}
|
};
|
||||||
|
|
||||||
// unauthorized session must be destroyed.
|
|
||||||
if (!authorized) terminate_sess(ws.id);
|
|
||||||
|
|
||||||
for (const i of ws.reconnectTimeout) {
|
for (const i of ws.reconnectTimeout) {
|
||||||
clearTimeout(i);
|
clearTimeout(i);
|
||||||
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
|
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const i of ws.subs) {
|
terminate_sess(ws.id);
|
||||||
direct_bc(["CLOSE", ws.fakesubalias.get(i[0])], ws.id);
|
|
||||||
cache_bc(["CLOSE", ws.fakesubalias.get(i[0])], ws.id);
|
|
||||||
}
|
|
||||||
|
|
||||||
onClientDisconnect();
|
|
||||||
|
|
||||||
// sensitive session must not be preserved.
|
|
||||||
if (private_keys && (ws.pubkey in private_keys)) terminate_sess(ws.id);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if (authorized) {
|
if (authorized) {
|
||||||
csess.set(ws.id, ws);
|
csess.set(ws.id, ws);
|
||||||
if (!orphan) newsess(ws.id);
|
newsess(ws.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -218,9 +204,8 @@ function newsess(id) {
|
|||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
function direct_bc(msg, id) {
|
function direct_bc(msg, id) {
|
||||||
for (const sock of socks) {
|
for (const sock of csess.get(id).relays) {
|
||||||
if (cache_relays?.includes(sock.url)) continue;
|
if (cache_relays?.includes(sock.url)) continue;
|
||||||
if (sock.id !== id) continue;
|
|
||||||
if (sock.readyState !== 1) continue;
|
if (sock.readyState !== 1) continue;
|
||||||
|
|
||||||
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
||||||
@@ -230,9 +215,8 @@ function direct_bc(msg, id) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function cache_bc(msg, id) {
|
function cache_bc(msg, id) {
|
||||||
for (const sock of socks) {
|
for (const sock of csess.get(id).relays) {
|
||||||
if (!cache_relays?.includes(sock.url)) continue;
|
if (!cache_relays?.includes(sock.url)) continue;
|
||||||
if (sock.id !== id) continue;
|
|
||||||
if (sock.readyState !== 1) continue;
|
if (sock.readyState !== 1) continue;
|
||||||
sock.send(JSON.stringify(msg));
|
sock.send(JSON.stringify(msg));
|
||||||
}
|
}
|
||||||
@@ -245,49 +229,11 @@ function bc(msg, id) {
|
|||||||
|
|
||||||
// WS - Terminate all existing sockets that were for <id>
|
// WS - Terminate all existing sockets that were for <id>
|
||||||
function terminate_sess(id) {
|
function terminate_sess(id) {
|
||||||
|
for (const sock of csess.get(id).relays) {
|
||||||
|
sock.close();
|
||||||
|
}
|
||||||
|
|
||||||
csess.delete(id);
|
csess.delete(id);
|
||||||
for (const sock of socks) {
|
|
||||||
if (sock.id !== id) continue;
|
|
||||||
sock.terminate();
|
|
||||||
socks.delete(sock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function onClientDisconnect() {
|
|
||||||
const orphanSessNum = howManyOrphanSess();
|
|
||||||
const max = max_orphan_sess || 0;
|
|
||||||
if (orphanSessNum > max) {
|
|
||||||
if (max) console.log(process.pid, `There are ${orphanSessNum} orphan sessions. I will clear ${orphanSessNum - max} orphan sessions.`);
|
|
||||||
clearOrphanSess(orphanSessNum - max);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function getOrphanSess() {
|
|
||||||
for (const sess of csess) {
|
|
||||||
if (sess[1] !== null) continue;
|
|
||||||
return sess[0];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function howManyOrphanSess() {
|
|
||||||
let howMany = 0;
|
|
||||||
for (const sess of csess) {
|
|
||||||
if (sess[1] !== null) continue;
|
|
||||||
howMany++
|
|
||||||
}
|
|
||||||
|
|
||||||
return howMany;
|
|
||||||
}
|
|
||||||
|
|
||||||
function clearOrphanSess(l) {
|
|
||||||
let cn = 0;
|
|
||||||
for (const sess of csess) {
|
|
||||||
if (cn >= l) break;
|
|
||||||
if (sess[1] !== null) continue;
|
|
||||||
terminate_sess(sess[0]);
|
|
||||||
cn++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS - Sessions
|
// WS - Sessions
|
||||||
@@ -307,7 +253,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
const client = csess.get(id);
|
const client = csess.get(id);
|
||||||
if (!csess.has(id)) return relay.terminate();
|
if (!csess.has(id)) return relay.terminate();
|
||||||
reconn_t = 0;
|
reconn_t = 0;
|
||||||
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
|
if (log_about_relays) console.log(process.pid, "---", `[${id}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
|
||||||
|
|
||||||
if (!client) return; // is orphan, do nothing.
|
if (!client) return; // is orphan, do nothing.
|
||||||
for (const i of client.my_events) {
|
for (const i of client.my_events) {
|
||||||
@@ -424,10 +370,9 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
|
|
||||||
relay.on('close', _ => {
|
relay.on('close', _ => {
|
||||||
const client = csess.get(id);
|
const client = csess.get(id);
|
||||||
socks.delete(relay); // Remove this socket session from [socks] list
|
if (!client) return;
|
||||||
if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url);
|
client.relays.delete(relay); // Remove this socket session from <client.relays> list
|
||||||
|
if (log_about_relays) console.log(process.pid, "-!-", `[${id}]`, "Disconnected from", relay.url);
|
||||||
if (!csess.has(id)) return;
|
|
||||||
reconn_t += reconnect_time || 5000
|
reconn_t += reconnect_time || 5000
|
||||||
const reconnectTimeout = setTimeout(_ => {
|
const reconnectTimeout = setTimeout(_ => {
|
||||||
newConn(addr, id, reconn_t);
|
newConn(addr, id, reconn_t);
|
||||||
@@ -437,11 +382,13 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
relay.on('unexpected-response', (req, res) => {
|
relay.on('unexpected-response', (req, res) => {
|
||||||
socks.delete(relay);
|
const client = csess.get(id);
|
||||||
|
if (!client) return;
|
||||||
|
client.relays.delete(relay);
|
||||||
if (res.statusCode >= 500) return relay.emit("close", null);
|
if (res.statusCode >= 500) return relay.emit("close", null);
|
||||||
delete relays[relays.indexOf(addr)];
|
delete relays[relays.indexOf(addr)];
|
||||||
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
||||||
});
|
});
|
||||||
|
|
||||||
socks.add(relay); // Add this socket session to [socks]
|
csess.get(id).relays.add(relay); // Add this socket session to <client.relays>
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user