mirror of
https://github.com/Yonle/bostr.git
synced 2025-09-27 13:26:30 +02:00
probably an fix for orphan sockets & fix potential memory leak
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
71
bouncer.js
71
bouncer.js
@@ -104,6 +104,7 @@ module.exports = (ws, req) => {
|
|||||||
if (authorized) return;
|
if (authorized) return;
|
||||||
// if orphan, ignore
|
// if orphan, ignore
|
||||||
csess.set(ws.id, ws);
|
csess.set(ws.id, ws);
|
||||||
|
updateSess(ws.id);
|
||||||
if (!orphan) newsess(ws.id);
|
if (!orphan) newsess(ws.id);
|
||||||
authorized = true;
|
authorized = true;
|
||||||
}
|
}
|
||||||
@@ -140,6 +141,7 @@ module.exports = (ws, req) => {
|
|||||||
|
|
||||||
if (authorized) {
|
if (authorized) {
|
||||||
csess.set(ws.id, ws);
|
csess.set(ws.id, ws);
|
||||||
|
updateSess(ws.id);
|
||||||
if (!orphan) newsess(ws.id);
|
if (!orphan) newsess(ws.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -244,14 +246,22 @@ function clearOrphanSess(l) {
|
|||||||
if (cn > l) break;
|
if (cn > l) break;
|
||||||
if (sess[1] !== null) continue;
|
if (sess[1] !== null) continue;
|
||||||
terminate_sess(sess[0]);
|
terminate_sess(sess[0]);
|
||||||
|
csess.delete(sess[0]);
|
||||||
cn++;
|
cn++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function updateSess(id) {
|
||||||
|
for (sock of socks) {
|
||||||
|
if (sock.id !== id) continue;
|
||||||
|
if (sock.readyState >= 2) return socks.delete(sock);
|
||||||
|
sock.client = csess.get(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WS - Sessions
|
// WS - Sessions
|
||||||
function newConn(addr, id) {
|
function newConn(addr, id) {
|
||||||
if (!csess.has(id)) return;
|
if (!csess.has(id)) return;
|
||||||
const client = csess.get(id);
|
|
||||||
const relay = new WebSocket(addr, {
|
const relay = new WebSocket(addr, {
|
||||||
headers: {
|
headers: {
|
||||||
"User-Agent": "Bostr; The nostr relay bouncer; https://github.com/Yonle/bostr"
|
"User-Agent": "Bostr; The nostr relay bouncer; https://github.com/Yonle/bostr"
|
||||||
@@ -259,23 +269,24 @@ function newConn(addr, id) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
relay.id = id;
|
relay.id = id;
|
||||||
|
relay.client = csess.get(id);
|
||||||
relay.on('open', _ => {
|
relay.on('open', _ => {
|
||||||
if (!csess.has(id)) return relay.terminate();
|
if (!csess.has(id)) return relay.terminate();
|
||||||
socks.add(relay); // Add this socket session to [socks]
|
socks.add(relay); // Add this socket session to [socks]
|
||||||
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
|
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
|
||||||
|
|
||||||
if (!client) return; // is orphan, do nothing.
|
if (!relay.client) return; // is orphan, do nothing.
|
||||||
for (i of client.my_events) {
|
for (i of relay.client.my_events) {
|
||||||
relay.send(JSON.stringify(["EVENT", i]));
|
relay.send(JSON.stringify(["EVENT", i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i of client.subs) {
|
for (i of relay.client.subs) {
|
||||||
relay.send(JSON.stringify(["REQ", i[0], i[1]]));
|
relay.send(JSON.stringify(["REQ", i[0], i[1]]));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.on('message', data => {
|
relay.on('message', data => {
|
||||||
if (!client) return;
|
if (!relay.client) return;
|
||||||
try {
|
try {
|
||||||
data = JSON.parse(data);
|
data = JSON.parse(data);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -285,23 +296,23 @@ function newConn(addr, id) {
|
|||||||
switch (data[0]) {
|
switch (data[0]) {
|
||||||
case "EVENT": {
|
case "EVENT": {
|
||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
if (!client.subs.has(data[1])) return;
|
if (!relay.client.subs.has(data[1])) return;
|
||||||
timeoutEOSE(id, data[1]);
|
timeoutEOSE(id, data[1]);
|
||||||
if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
|
if (relay.client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
|
||||||
|
|
||||||
// if filter.since > receivedEvent.created_at, skip
|
// if filter.since > receivedEvent.created_at, skip
|
||||||
// if receivedEvent.created_at > filter.until, skip
|
// if receivedEvent.created_at > filter.until, skip
|
||||||
const cFilter = client.subs.get(data[1])
|
const cFilter = relay.client.subs.get(data[1])
|
||||||
if (cFilter?.since > data[2].created_at) return;
|
if (cFilter?.since > data[2].created_at) return;
|
||||||
if (data[2].created_at > cFilter?.until) return;
|
if (data[2].created_at > cFilter?.until) return;
|
||||||
const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase());
|
const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase());
|
||||||
|
|
||||||
if (NotInSearchQuery) return;
|
if (NotInSearchQuery) return;
|
||||||
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
if (relay.client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
||||||
|
|
||||||
if (!client.pause_subs.has(data[1])) {
|
if (!relay.client.pause_subs.has(data[1])) {
|
||||||
client.events.get(data[1]).add(data[2]?.id);
|
relay.client.events.get(data[1]).add(data[2]?.id);
|
||||||
client.send(JSON.stringify(data));
|
relay.client.send(JSON.stringify(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
// send into cache relays.
|
// send into cache relays.
|
||||||
@@ -311,46 +322,46 @@ function newConn(addr, id) {
|
|||||||
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
||||||
|
|
||||||
// Skip if EOSE has been omitted
|
// Skip if EOSE has been omitted
|
||||||
if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?.limit || client.pause_subs.has(data[1])) return;
|
if (!relay.client.pendingEOSE.has(data[1]) || !relay.client.subs.get(data[1])?.limit || relay.client.pause_subs.has(data[1])) return;
|
||||||
if (client.events.get(data[1]).size >= client.subs.get(data[1])?.limit) {
|
if (relay.client.events.get(data[1]).size >= relay.client.subs.get(data[1])?.limit) {
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
client.send(JSON.stringify(["EOSE", data[1]]));
|
relay.client.send(JSON.stringify(["EOSE", data[1]]));
|
||||||
if (pause_on_limit || cache_relays?.includes(relay.url)) {
|
if (pause_on_limit || cache_relays?.includes(relay.url)) {
|
||||||
client.pause_subs.add(data[1]);
|
relay.client.pause_subs.add(data[1]);
|
||||||
} else {
|
} else {
|
||||||
client.pendingEOSE.delete(data[1]);
|
relay.client.pendingEOSE.delete(data[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "EOSE":
|
case "EOSE":
|
||||||
if (!client.pendingEOSE.has(data[1])) return;
|
if (!relay.client.pendingEOSE.has(data[1])) return;
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
relay.client.pendingEOSE.set(data[1], relay.client.pendingEOSE.get(data[1]) + 1);
|
||||||
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${relay.client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
||||||
|
|
||||||
if (!cache_relays?.includes(relay.url)) {
|
if (!cache_relays?.includes(relay.url)) {
|
||||||
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
|
if (wait_eose && ((relay.client.pendingEOSE.get(data[1]) < max_eose_score) || (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
|
||||||
if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
|
if (relay.client.pause_subs.has(data[1])) return relay.client.pause_subs.delete(data[1]);
|
||||||
|
|
||||||
cancel_EOSETimeout(data[1]);
|
cancel_EOSETimeout(data[1]);
|
||||||
} else {
|
} else {
|
||||||
if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return;
|
if (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return;
|
||||||
// get the filter
|
// get the filter
|
||||||
const filter = client.subs.get(data[1]);
|
const filter = relay.client.subs.get(data[1]);
|
||||||
if (client.pause_subs.has(data[1])) {
|
if (relay.client.pause_subs.has(data[1])) {
|
||||||
client.pause_subs.delete(data[1]);
|
relay.client.pause_subs.delete(data[1]);
|
||||||
client.pendingEOSE.delete(data[1]);
|
relay.client.pendingEOSE.delete(data[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now req to the direct connection, with the recent one please.
|
// now req to the direct connection, with the recent one please.
|
||||||
return direct_bc(["REQ", data[1], filter], id);
|
return direct_bc(["REQ", data[1], filter], id);
|
||||||
}
|
}
|
||||||
|
|
||||||
client.send(JSON.stringify(data));
|
relay.client.send(JSON.stringify(data));
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
|
if (!private_keys || typeof(data[1]) !== "string" || !relay.client.pubkey) return;
|
||||||
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
|
nip42(relay, relay.client.pubkey, private_keys[relay.client.pubkey], data[1]);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Reference in New Issue
Block a user