mirror of
https://github.com/Yonle/bostr.git
synced 2025-06-16 03:41:53 +02:00
worker_bouncer: try replace newConn function code as class inherited from WebSocket
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
parent
a9aa39567b
commit
ec9baec453
@ -179,18 +179,24 @@ function newsess() {
|
|||||||
|
|
||||||
if (cache_relays) {
|
if (cache_relays) {
|
||||||
for (const url of cache_relays) {
|
for (const url of cache_relays) {
|
||||||
newConn(url, id);
|
if (!csess.hasOwnProperty(id)) break;
|
||||||
|
const relay = new newConn(url, id);
|
||||||
|
userRelays[id].add(relay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (shift) {
|
switch (shift) {
|
||||||
case undefined:
|
case undefined:
|
||||||
for (const url of relays) {
|
for (const url of relays) {
|
||||||
newConn(url, id);
|
if (!csess.hasOwnProperty(id)) break;
|
||||||
|
const relay = new newConn(url, id);
|
||||||
|
userRelays[id].add(relay);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
newConn(shift, id);
|
if (!csess.hasOwnProperty(id)) break;
|
||||||
|
const relay = new newConn(shift, id);
|
||||||
|
userRelays[id].add(relay);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -275,187 +281,188 @@ function relay_type(addr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function newConn(addr, id, reconn_t = 0) {
|
class newConn extends WebSocket {
|
||||||
if (!csess.hasOwnProperty(id)) return;
|
constructor(addr, id, reconn_t = 0) {
|
||||||
if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 };
|
if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 };
|
||||||
const relay = new WebSocket(addr, {
|
super(addr, {
|
||||||
headers: {
|
headers: {
|
||||||
"User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr; ConnID: ${id}; ${server_meta.canonical_url || "No canonical bouncer URL specified"}; Contact: ${server_meta.contact}`,
|
"User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr; ConnID: ${id}; ${server_meta.canonical_url || "No canonical bouncer URL specified"}; Contact: ${server_meta.contact}`,
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
relay.isCache = relay_type(addr) === "cache_relay";
|
|
||||||
relay.isLoadBalancer = relay_type(addr) === "loadbalancer";
|
|
||||||
relay.ratelimit = 0;
|
|
||||||
relay.pendingNIP42 = new Set();
|
|
||||||
relay.on('open', _ => {
|
|
||||||
if (!csess.hasOwnProperty(id)) return relay.terminate();
|
|
||||||
const client = csess[id];
|
|
||||||
reconn_t = 0;
|
|
||||||
if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
|
|
||||||
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
for (const i in client.subs) {
|
|
||||||
relay.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]]));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
relay.on('message', data => {
|
|
||||||
try {
|
|
||||||
data = JSON.parse(data);
|
|
||||||
} catch (error) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const client = csess[id];
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
switch (data[0]) {
|
|
||||||
case "EVENT": {
|
|
||||||
stats._global.raw_rx++;
|
|
||||||
stats[addr].raw_rx++;
|
|
||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
|
||||||
if (!client.subalias.hasOwnProperty(data[1])) return;
|
|
||||||
data[1] = client.subalias[data[1]];
|
|
||||||
|
|
||||||
if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
|
||||||
if (!relay.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
|
|
||||||
const filter = client.mergedFilters[data[1]];
|
|
||||||
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !relay.isCache) return;
|
|
||||||
|
|
||||||
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
|
||||||
|
|
||||||
const filters = client.subs[data[1]];
|
|
||||||
if (!_matchFilters(filters, data[2])) return;
|
|
||||||
|
|
||||||
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
|
|
||||||
if (NotInSearchQuery) return;
|
|
||||||
|
|
||||||
if (!relay.isLoadBalancer) client.events[data[1]].add(data[2]?.id);
|
|
||||||
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
|
||||||
|
|
||||||
if (max_known_events && client.events[data[1]].size >= max_known_events)
|
|
||||||
client.events[data[1]].delete(client.events[data[1]].values().next().value);
|
|
||||||
|
|
||||||
stats._global.rx++;
|
|
||||||
stats[addr].rx++;
|
|
||||||
|
|
||||||
// Now count for REQ limit requested by client.
|
|
||||||
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
|
||||||
|
|
||||||
// Skip if EOSE has been omitted
|
|
||||||
if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return;
|
|
||||||
const limit = getFilterLimit(filter);
|
|
||||||
if (limit === Infinity) return;
|
|
||||||
if (client.events[data[1]].size >= limit) {
|
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
|
||||||
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) });
|
|
||||||
|
|
||||||
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
|
||||||
client.pause_subs.add(data[1]);
|
|
||||||
} else {
|
|
||||||
delete client.pendingEOSE[data[1]];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
case "EOSE":
|
});
|
||||||
if (!client.subalias.hasOwnProperty(data[1])) return;
|
|
||||||
data[1] = client.subalias[data[1]];
|
|
||||||
if (!client.pendingEOSE.hasOwnProperty(data[1]) && !relay.isLoadBalancer) return;
|
|
||||||
client.pendingEOSE[data[1]]++;
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[id].size} connected relays.`);
|
this.isCache = relay_type(addr) === "cache_relay";
|
||||||
|
this.isLoadBalancer = relay_type(addr) === "loadbalancer";
|
||||||
|
this.ratelimit = 0;
|
||||||
|
this.pendingNIP42 = new Set();
|
||||||
|
this.on('open', _ => {
|
||||||
|
if (!csess.hasOwnProperty(id)) return this.terminate();
|
||||||
|
const client = csess[id];
|
||||||
|
reconn_t = 0;
|
||||||
|
if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
|
||||||
|
|
||||||
if (!relay.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[id].size)))) return;
|
if (!client) return;
|
||||||
if (relay.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
|
|
||||||
delete client.pendingEOSE[data[1]];
|
|
||||||
|
|
||||||
if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) {
|
for (const i in client.subs) {
|
||||||
client.pause_subs.delete(data[1]);
|
this.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]]));
|
||||||
} else {
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.on('message', data => {
|
||||||
|
try {
|
||||||
|
data = JSON.parse(data);
|
||||||
|
} catch (error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const client = csess[id];
|
||||||
|
if (!client) return;
|
||||||
|
|
||||||
|
switch (data[0]) {
|
||||||
|
case "EVENT": {
|
||||||
|
stats._global.raw_rx++;
|
||||||
|
stats[addr].raw_rx++;
|
||||||
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
|
if (!client.subalias.hasOwnProperty(data[1])) return;
|
||||||
|
data[1] = client.subalias[data[1]];
|
||||||
|
|
||||||
|
if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
||||||
|
if (!this.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
|
||||||
|
const filter = client.mergedFilters[data[1]];
|
||||||
|
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !this.isCache) return;
|
||||||
|
|
||||||
|
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
||||||
|
|
||||||
|
const filters = client.subs[data[1]];
|
||||||
|
if (!_matchFilters(filters, data[2])) return;
|
||||||
|
|
||||||
|
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
|
||||||
|
if (NotInSearchQuery) return;
|
||||||
|
|
||||||
|
if (!this.isLoadBalancer) client.events[data[1]].add(data[2]?.id);
|
||||||
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
||||||
|
|
||||||
|
if (max_known_events && client.events[data[1]].size >= max_known_events)
|
||||||
|
client.events[data[1]].delete(client.events[data[1]].values().next().value);
|
||||||
|
|
||||||
|
stats._global.rx++;
|
||||||
|
stats[addr].rx++;
|
||||||
|
|
||||||
|
// Now count for REQ limit requested by client.
|
||||||
|
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
||||||
|
|
||||||
|
// Skip if EOSE has been omitted
|
||||||
|
if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || this.isLoadBalancer) return;
|
||||||
|
const limit = getFilterLimit(filter);
|
||||||
|
if (limit === Infinity) return;
|
||||||
|
if (client.events[data[1]].size >= limit) {
|
||||||
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) });
|
||||||
|
|
||||||
|
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
||||||
|
client.pause_subs.add(data[1]);
|
||||||
|
} else {
|
||||||
|
delete client.pendingEOSE[data[1]];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
break;
|
case "EOSE":
|
||||||
case "AUTH":
|
if (!client.subalias.hasOwnProperty(data[1])) return;
|
||||||
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return relay.pendingNIP42.add(data[1]);
|
data[1] = client.subalias[data[1]];
|
||||||
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
|
if (!client.pendingEOSE.hasOwnProperty(data[1]) && !this.isLoadBalancer) return;
|
||||||
break;
|
client.pendingEOSE[data[1]]++;
|
||||||
|
|
||||||
case "NOTICE":
|
if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[id].size} connected relays.`);
|
||||||
if (typeof(data[1]) !== "string") return;
|
|
||||||
if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]);
|
if (!this.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[id].size)))) return;
|
||||||
|
if (this.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
|
||||||
|
delete client.pendingEOSE[data[1]];
|
||||||
|
|
||||||
stats._global.f++
|
if (client.pause_subs.has(data[1]) && !this.isLoadBalancer) {
|
||||||
stats[addr].f++
|
client.pause_subs.delete(data[1]);
|
||||||
|
} else {
|
||||||
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case "AUTH":
|
||||||
|
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return this.pendingNIP42.add(data[1]);
|
||||||
|
nip42(this, client.pubkey, private_keys[client.pubkey], data[1]);
|
||||||
|
break;
|
||||||
|
|
||||||
break;
|
case "NOTICE":
|
||||||
|
if (typeof(data[1]) !== "string") return;
|
||||||
|
if (data[1].startsWith("rate-limited")) this.ratelimit = Date.now();
|
||||||
|
|
||||||
case "CLOSED":
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]);
|
||||||
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
|
|
||||||
if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]);
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
|
||||||
if (data[2].length) {
|
break;
|
||||||
stats._global.f++;
|
|
||||||
stats[addr].f++;
|
|
||||||
}
|
|
||||||
if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "OK":
|
case "CLOSED":
|
||||||
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
|
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
|
||||||
if (data[3].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
if (data[2].startsWith("rate-limited")) this.ratelimit = Date.now();
|
||||||
|
|
||||||
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]);
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]);
|
||||||
|
|
||||||
switch (data[2]) {
|
if (data[2].length) {
|
||||||
case true:
|
stats._global.f++;
|
||||||
stats._global.tx++;
|
stats[addr].f++;
|
||||||
stats[addr].tx++;
|
}
|
||||||
case false:
|
if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++;
|
||||||
stats._global.f++
|
break;
|
||||||
stats[addr].f++
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
case "OK":
|
||||||
}
|
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
|
||||||
});
|
if (data[3].startsWith("rate-limited")) this.ratelimit = Date.now();
|
||||||
|
|
||||||
relay.on('error', _ => {
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]);
|
||||||
if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString())
|
|
||||||
});
|
|
||||||
|
|
||||||
relay.on('close', _ => {
|
switch (data[2]) {
|
||||||
if (!userRelays.hasOwnProperty(id)) return;
|
case true:
|
||||||
userRelays[id].delete(relay);
|
stats._global.tx++;
|
||||||
if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
|
stats[addr].tx++;
|
||||||
reconn_t += reconnect_time || 5000
|
case false:
|
||||||
setTimeout(_ => {
|
stats._global.f++
|
||||||
newConn(addr, id, reconn_t);
|
stats[addr].f++
|
||||||
}, reconn_t);
|
}
|
||||||
|
|
||||||
stats._global.f++
|
break;
|
||||||
stats[addr].f++
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.on('unexpected-response', (req, res) => {
|
this.on('error', _ => {
|
||||||
if (!userRelays.hasOwnProperty(id)) return;
|
if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString())
|
||||||
userRelays[id].delete(relay);
|
});
|
||||||
if (res.statusCode >= 500) return relay.emit("close", null);
|
|
||||||
relays.splice(relays.indexOf(addr), 1);
|
|
||||||
cache_relays.splice(cache_relays.indexOf(addr), 1);
|
|
||||||
loadbalancer.splice(loadbalancer.indexOf(addr), 1);
|
|
||||||
console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
|
||||||
|
|
||||||
stats._global.f++
|
this.on('close', _ => {
|
||||||
stats[addr].f++
|
if (!userRelays.hasOwnProperty(id)) return;
|
||||||
});
|
userRelays[id].delete(this);
|
||||||
|
if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
|
||||||
|
reconn_t += reconnect_time || 5000
|
||||||
|
setTimeout(_ => {
|
||||||
|
if (!csess.hasOwnProperty(id)) return;
|
||||||
|
const relay = new newConn(addr, id, reconn_t);
|
||||||
|
userRelays[id].add(relay);
|
||||||
|
}, reconn_t);
|
||||||
|
|
||||||
userRelays[id].add(relay);
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
});
|
||||||
|
|
||||||
|
this.on('unexpected-response', (req, res) => {
|
||||||
|
if (!userRelays.hasOwnProperty(id)) return;
|
||||||
|
userRelays[id].delete(this);
|
||||||
|
if (res.statusCode >= 500) return this.emit("close", null);
|
||||||
|
relays.splice(relays.indexOf(addr), 1);
|
||||||
|
cache_relays.splice(cache_relays.indexOf(addr), 1);
|
||||||
|
loadbalancer.splice(loadbalancer.indexOf(addr), 1);
|
||||||
|
console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
||||||
|
|
||||||
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (let i = 1; i <= (idle_sessions || 1); i++) {
|
for (let i = 1; i <= (idle_sessions || 1); i++) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user