try to make idle session and wait for a client to take over it

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle 2024-03-18 20:50:45 +07:00
parent 2bf62c6551
commit d6cfdcb6ad
2 changed files with 56 additions and 43 deletions

View File

@ -16,6 +16,10 @@ blocked_publishers = blocked_publishers?.map(i => i.startsWith("npub") ? nip19.d
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
if (relays.length < max_eose_score) max_eose_score = 0;
const csess = new Map(); // this is used for relays.
const userRelays = new Map(); // per ID contains Set() of <WebSocket>
const orphanSess = new Set();
// CL - User socket
module.exports = (ws, req, onClose) => {
let query = querystring.parse(req.url.slice(2));
@ -24,7 +28,6 @@ module.exports = (ws, req, onClose) => {
let sessStarted = false;
let lastEvent = Date.now();
ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
ws.relays = new Set(); // Set() of connected relays.
ws.subs = new Map(); // contains filter submitted by clients. per subID
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
@ -86,13 +89,13 @@ module.exports = (ws, req, onClose) => {
lastEvent = Date.now();
ws.my_events.add(data[1]);
if (!ws.relays.size && !sessStarted) {
if (!sessStarted) {
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
newsess(ws);
getOrphanSess(ws);
sessStarted = true;
}
bc(data, ws);
bc(data, ws.id);
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
break;
case "REQ": {
@ -123,9 +126,9 @@ module.exports = (ws, req, onClose) => {
filters[fn].limit = ws.forcedLimit;
}
if (!ws.relays.size && !sessStarted) {
if (!sessStarted) {
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
newsess(ws);
getOrphanSess(ws);
sessStarted = true;
}
@ -137,7 +140,7 @@ module.exports = (ws, req, onClose) => {
if (!filter.since) filter.since = Math.floor(Date.now() / 1000); // Will not impact everything. Only used for handling passing pause_on_limit (or save mode)
ws.mergedFilters.set(origID, filter);
data[1] = faked;
bc(data, ws);
bc(data, ws.id);
if (filter.limit < 1) return ws.send(JSON.stringify(["EOSE", origID]));
ws.pendingEOSE.set(origID, 0);
break;
@ -157,7 +160,7 @@ module.exports = (ws, req, onClose) => {
ws.mergedFilters.delete(origID);
data[1] = faked;
bc(data, ws);
bc(data, ws.id);
ws.send(JSON.stringify(["CLOSED", origID, ""]));
break;
case "AUTH":
@ -181,25 +184,28 @@ module.exports = (ws, req, onClose) => {
console.log(process.pid, "---", `${ws.ip} disconnected`);
for (const i of ws.reconnectTimeout) {
clearTimeout(i);
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
}
for (const sock of ws.relays) {
for (const sock of userRelays.get(ws.id)) {
sock.terminate();
}
userRelays.delete(ws.id);
csess.delete(ws.id);
});
}
// WS - New session for client $id
function newsess(ws) {
relays.forEach(_ => newConn(_, ws));
// mostly for new idle session.
function newsess() {
const id = Date.now() + "_" + process.pid + "_" + Math.random();
userRelays.set(id, new Set());
csess.set(id, null);
orphanSess.add(id);
relays.forEach(_ => newConn(_, id));
}
// WS - Broadcast message to every existing sockets
function bc(msg, ws) {
for (const sock of ws.relays) {
function bc(msg, id) {
for (const sock of userRelays.get(id)) {
if (sock.readyState !== 1) continue;
// skip the ratelimit after <config.upstream_ratelimit_expiration>
@ -208,17 +214,20 @@ function bc(msg, ws) {
}
}
// WS - Sessions
function newConn(addr, client, reconn_t = 0) {
if (client.readyState !== 1) return;
let additionalReqHeaders = {};
if (forward_ip_address_to_upstream)
additionalReqHeaders["x-forwarded-for"] = client.ip;
function getOrphanSess(ws) {
ws.id = orphanSess.values().next().value;
orphanSess.delete(ws.id);
csess.set(ws.id, ws);
newsess();
}
// WS - Sessions
function newConn(addr, id, reconn_t = 0) {
if (!csess.has(id)) return;
const relay = new WebSocket(addr, {
headers: {
"User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr`,
...additionalReqHeaders
},
noDelay: true,
allowSynchronousEvents: true
@ -226,10 +235,12 @@ function newConn(addr, client, reconn_t = 0) {
relay.ratelimit = 0;
relay.on('open', _ => {
if (client.readyState !== 1) return relay.terminate();
if (!csess.has(id)) return relay.terminate();
const client = csess.get(id);
reconn_t = 0;
if (log_about_relays) console.log(process.pid, "---", client.ip, `${relay.url} is connected`);
if (log_about_relays) console.log(process.pid, "---", id, `${relay.url} is connected`);
if (!client) return;
for (const i of client.my_events) {
relay.send(JSON.stringify(["EVENT", i]));
}
@ -240,12 +251,14 @@ function newConn(addr, client, reconn_t = 0) {
});
relay.on('message', data => {
if (client.readyState !== 1) return relay.terminate();
if (!csess.has(id)) return relay.terminate();
try {
data = JSON.parse(data);
} catch (error) {
return;
}
const client = csess.get(id);
if (!client) return;
switch (data[0]) {
case "EVENT": {
@ -290,9 +303,9 @@ function newConn(addr, client, reconn_t = 0) {
data[1] = client.subalias.get(data[1]);
if (!client.pendingEOSE.has(data[1])) return;
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
if (log_about_relays) console.log(process.pid, "---", client.ip, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`);
if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return;
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size))) return;
client.pendingEOSE.delete(data[1]);
if (client.pause_subs.has(data[1])) {
@ -322,28 +335,28 @@ function newConn(addr, client, reconn_t = 0) {
});
relay.on('error', _ => {
if (log_about_relays) console.error(process.pid, "-!-", client.ip, relay.url, _.toString())
if (log_about_relays) console.error(process.pid, "-!-", id, relay.url, _.toString())
});
relay.on('close', _ => {
if (client.readyState !== 1) return;
client.relays.delete(relay); // Remove this socket session from <client.relays> list
if (log_about_relays) console.log(process.pid, "-!-", client.ip, "Disconnected from", relay.url);
if (!userRelays.has(id)) return;
userRelays.get(id).delete(relay); // Remove this socket session from <client.relays> list
if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", relay.url);
reconn_t += reconnect_time || 5000
const reconnectTimeout = setTimeout(_ => {
newConn(addr, client, reconn_t);
client?.reconnectTimeout.delete(reconnectTimeout);
}, reconn_t); // As a bouncer server, We need to reconnect.
client?.reconnectTimeout.add(reconnectTimeout);
setTimeout(_ => {
newConn(addr, id, reconn_t);
}, reconn_t);
});
relay.on('unexpected-response', (req, res) => {
if (client.readyState !== 1) return;
client.relays.delete(relay);
if (!userRelays.has(id)) return;
userRelays.get(id).delete(relay);
if (res.statusCode >= 500) return relay.emit("close", null);
relays = relays.filter(_ => !relay.url.startsWith(_));
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
});
client.relays.add(relay); // Add this socket session to <client.relays>
userRelays.get(id).add(relay); // Add this socket session to <client.relays>
}
newsess();

View File

@ -1,6 +1,6 @@
{
"name": "bostr",
"version": "2.0.10",
"version": "2.0.11-dev",
"description": "Nostr relay bouncer",
"main": "index.js",
"scripts": {