no need for updateSess

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle
2024-01-04 14:51:06 +07:00
parent a78d8370ec
commit 9815e5fecc

View File

@@ -106,7 +106,6 @@ module.exports = (ws, req) => {
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)"); console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
if (authorized) return; if (authorized) return;
csess.set(ws.id, ws); csess.set(ws.id, ws);
updateSess(ws.id);
if (!orphan) newsess(ws.id); if (!orphan) newsess(ws.id);
authorized = true; authorized = true;
} }
@@ -122,7 +121,6 @@ module.exports = (ws, req) => {
console.log(process.pid, "---", "Sock", ws.id, "has disconnected.", `(${howManyOrphanSess()+1} orphans)`); console.log(process.pid, "---", "Sock", ws.id, "has disconnected.", `(${howManyOrphanSess()+1} orphans)`);
if (csess.has(ws.id)) { if (csess.has(ws.id)) {
csess.set(ws.id, null); // set as orphan. csess.set(ws.id, null); // set as orphan.
updateSess(ws.id); // change relays relay.client object
} }
for (i of ws.EOSETimeout) { for (i of ws.EOSETimeout) {
@@ -149,7 +147,6 @@ module.exports = (ws, req) => {
if (authorized) { if (authorized) {
csess.set(ws.id, ws); csess.set(ws.id, ws);
updateSess(ws.id);
if (!orphan) newsess(ws.id); if (!orphan) newsess(ws.id);
} }
} }
@@ -259,14 +256,6 @@ function clearOrphanSess(l) {
} }
} }
function updateSess(id) {
for (sock of socks) {
if (sock.id !== id) continue;
if (sock.readyState >= 2) return socks.delete(sock);
sock.client = csess.get(id);
}
}
// WS - Sessions // WS - Sessions
function newConn(addr, id) { function newConn(addr, id) {
if (!csess.has(id)) return; if (!csess.has(id)) return;
@@ -278,24 +267,25 @@ function newConn(addr, id) {
}); });
relay.id = id; relay.id = id;
relay.client = csess.get(id);
relay.on('open', _ => { relay.on('open', _ => {
const client = csess.get(id);
if (!csess.has(id)) return relay.terminate(); if (!csess.has(id)) return relay.terminate();
socks.add(relay); // Add this socket session to [socks] socks.add(relay); // Add this socket session to [socks]
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!relay.client ? "(orphan)" : ""}`); if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
if (!relay.client) return; // is orphan, do nothing. if (!client) return; // is orphan, do nothing.
for (i of relay.client.my_events) { for (i of client.my_events) {
relay.send(JSON.stringify(["EVENT", i])); relay.send(JSON.stringify(["EVENT", i]));
} }
for (i of relay.client.subs) { for (i of client.subs) {
relay.send(JSON.stringify(["REQ", i[0], ...i[1]])); relay.send(JSON.stringify(["REQ", i[0], ...i[1]]));
} }
}); });
relay.on('message', data => { relay.on('message', data => {
if (!relay.client) return; const client = csess.get(id);
if (!client) return;
try { try {
data = JSON.parse(data); data = JSON.parse(data);
} catch (error) { } catch (error) {
@@ -305,23 +295,23 @@ function newConn(addr, id) {
switch (data[0]) { switch (data[0]) {
case "EVENT": { case "EVENT": {
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return; if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
if (!relay.client.subs.has(data[1])) return; if (!client.subs.has(data[1])) return;
timeoutEOSE(id, data[1]); timeoutEOSE(id, data[1]);
if (relay.client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return; if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
// if filter.since > receivedEvent.created_at, skip // if filter.since > receivedEvent.created_at, skip
// if receivedEvent.created_at > filter.until, skip // if receivedEvent.created_at > filter.until, skip
const cFilter = relay.client.subs.get(data[1]) const cFilter = client.subs.get(data[1])
if (cFilter?.since > data[2].created_at) return; if (cFilter?.since > data[2].created_at) return;
if (data[2].created_at > cFilter?.until) return; if (data[2].created_at > cFilter?.until) return;
const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase()); const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase());
if (NotInSearchQuery) return; if (NotInSearchQuery) return;
if (relay.client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before. if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
if (!relay.client.pause_subs.has(data[1])) { if (!client.pause_subs.has(data[1])) {
relay.client.events.get(data[1]).add(data[2]?.id); client.events.get(data[1]).add(data[2]?.id);
relay.client.send(JSON.stringify(data)); client.send(JSON.stringify(data));
} }
// send into cache relays. // send into cache relays.
@@ -331,46 +321,46 @@ function newConn(addr, id) {
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted // Skip if EOSE has been omitted
if (!relay.client.pendingEOSE.has(data[1]) || !relay.client.subs.get(data[1])?.limit || relay.client.pause_subs.has(data[1])) return; if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?[0]?.limit || client.pause_subs.has(data[1])) return;
if (relay.client.events.get(data[1]).size >= relay.client.subs.get(data[1])?.limit) { if (client.events.get(data[1]).size >= client.subs.get(data[1])?[0]?.limit) {
// Once reached to <filter.limit>, send EOSE to client. // Once reached to <filter.limit>, send EOSE to client.
relay.client.send(JSON.stringify(["EOSE", data[1]])); client.send(JSON.stringify(["EOSE", data[1]]));
if (pause_on_limit || cache_relays?.includes(relay.url)) { if (pause_on_limit || cache_relays?.includes(relay.url)) {
relay.client.pause_subs.add(data[1]); client.pause_subs.add(data[1]);
} else { } else {
relay.client.pendingEOSE.delete(data[1]); client.pendingEOSE.delete(data[1]);
} }
} }
break; break;
} }
case "EOSE": case "EOSE":
if (!relay.client.pendingEOSE.has(data[1])) return; if (!client.pendingEOSE.has(data[1])) return;
relay.client.pendingEOSE.set(data[1], relay.client.pendingEOSE.get(data[1]) + 1); client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${relay.client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`); if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
if (!cache_relays?.includes(relay.url)) { if (!cache_relays?.includes(relay.url)) {
if (wait_eose && ((relay.client.pendingEOSE.get(data[1]) < max_eose_score) || (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return; if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
if (relay.client.pause_subs.has(data[1])) return relay.client.pause_subs.delete(data[1]); if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
cancel_EOSETimeout(data[1]); cancel_EOSETimeout(data[1]);
} else { } else {
if (relay.client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return; if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return;
// get the filter // get the filter
const filter = relay.client.subs.get(data[1]); const filter = client.subs.get(data[1]);
if (relay.client.pause_subs.has(data[1])) { if (client.pause_subs.has(data[1])) {
relay.client.pause_subs.delete(data[1]); client.pause_subs.delete(data[1]);
relay.client.pendingEOSE.delete(data[1]); client.pendingEOSE.delete(data[1]);
} }
// now req to the direct connection, with the recent one please. // now req to the direct connection, with the recent one please.
return direct_bc(["REQ", data[1], filter], id); return direct_bc(["REQ", data[1], ...filter], id);
} }
relay.client.send(JSON.stringify(data)); client.send(JSON.stringify(data));
break; break;
case "AUTH": case "AUTH":
if (!private_keys || typeof(data[1]) !== "string" || !relay.client.pubkey) return; if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
nip42(relay, relay.client.pubkey, private_keys[relay.client.pubkey], data[1]); nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
break; break;
} }
}); });
@@ -380,14 +370,15 @@ function newConn(addr, id) {
}); });
relay.on('close', _ => { relay.on('close', _ => {
const client = csess.get(id);
socks.delete(relay) // Remove this socket session from [socks] list socks.delete(relay) // Remove this socket session from [socks] list
if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url);
if (!csess.has(id)) return; if (!csess.has(id)) return;
const reconnectTimeout = setTimeout(_ => { const reconnectTimeout = setTimeout(_ => {
newConn(addr, id); newConn(addr, id);
relay.client?.reconnectTimeout.delete(reconnectTimeout); client?.reconnectTimeout.delete(reconnectTimeout);
}, reconnect_time || 5000); // As a bouncer server, We need to reconnect. }, reconnect_time || 5000); // As a bouncer server, We need to reconnect.
relay.client?.reconnectTimeout.add(reconnectTimeout); client?.reconnectTimeout.add(reconnectTimeout);
}); });
} }