code: cleanup & fix NIP-42 for upstream

- Fix NIP-42 authentication in bouncer side
- Split event handler codes for receiver

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle 2024-06-08 17:07:04 +07:00
parent 4af2f4095b
commit d8d364e274
2 changed files with 193 additions and 173 deletions

View File

@ -1,4 +1,5 @@
"use strict"; "use strict";
const { threadId } = require("node:worker_threads");
const { finalizeEvent, nip19 } = require("nostr-tools"); const { finalizeEvent, nip19 } = require("nostr-tools");
module.exports = (relay, pubkey, privkey, challenge) => { module.exports = (relay, pubkey, privkey, challenge) => {
@ -16,4 +17,6 @@ module.exports = (relay, pubkey, privkey, challenge) => {
}, privkey); }, privkey);
relay.send(JSON.stringify(["AUTH", signed_challenge])); relay.send(JSON.stringify(["AUTH", signed_challenge]));
console.log(threadId, "-", relay.id, "NIP-42: Sending Signed Challenge to", relay.url);
} }

View File

@ -10,10 +10,16 @@ const nip42 = require("./nip42.js");
let { relays, log_about_relays, server_meta, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, loadbalancer, max_known_events } = require(process.env.BOSTR_CONFIG_PATH || "./config"); let { relays, log_about_relays, server_meta, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, loadbalancer, max_known_events } = require(process.env.BOSTR_CONFIG_PATH || "./config");
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays; log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
loadbalancer = loadbalancer || []; loadbalancer = loadbalancer || [];
if (relays.length) loadbalancer.unshift(undefined); if (relays.length) loadbalancer.unshift(undefined);
for (const key in private_keys) {
if (!key.startsWith("npub")) continue;
private_keys[nip19.decode(key).data] = private_keys[key];
delete private_keys[key];
}
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score> // CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
if (relays.length < max_eose_score) max_eose_score = 0; if (relays.length < max_eose_score) max_eose_score = 0;
@ -180,7 +186,7 @@ function newsess() {
if (cache_relays) { if (cache_relays) {
for (const url of cache_relays) { for (const url of cache_relays) {
if (!csess.hasOwnProperty(id)) break; if (!csess.hasOwnProperty(id)) break;
const relay = new newConn(url, id); const relay = new Session(url, id);
userRelays[id].add(relay); userRelays[id].add(relay);
} }
} }
@ -189,13 +195,13 @@ function newsess() {
case undefined: case undefined:
for (const url of relays) { for (const url of relays) {
if (!csess.hasOwnProperty(id)) break; if (!csess.hasOwnProperty(id)) break;
const relay = new newConn(url, id); const relay = new Session(url, id);
userRelays[id].add(relay); userRelays[id].add(relay);
} }
break; break;
default: default:
if (!csess.hasOwnProperty(id)) break; if (!csess.hasOwnProperty(id)) break;
const relay = new newConn(shift, id); const relay = new Session(shift, id);
userRelays[id].add(relay); userRelays[id].add(relay);
break; break;
} }
@ -232,6 +238,7 @@ function getIdleSess(ws) {
ws.subalias = {}; ws.subalias = {};
ws.fakesubalias = {}; ws.fakesubalias = {};
ws.mergedFilters = {}; ws.mergedFilters = {};
ws.id = idleSess.values().next().value;
if (ws.pubkey && private_keys[ws.pubkey]) { if (ws.pubkey && private_keys[ws.pubkey]) {
for (const relay of userRelays[ws.id]) { for (const relay of userRelays[ws.id]) {
@ -242,7 +249,6 @@ function getIdleSess(ws) {
} }
} }
ws.id = idleSess.values().next().value;
idleSess.delete(ws.id); idleSess.delete(ws.id);
csess[ws.id] = ws; csess[ws.id] = ws;
idents[ws.ident] = ws; idents[ws.ident] = ws;
@ -281,7 +287,7 @@ function relay_type(addr) {
} }
} }
class newConn extends WebSocket { class Session extends WebSocket {
constructor(addr, id, reconn_t = 0) { constructor(addr, id, reconn_t = 0) {
if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 }; if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 };
super(addr, { super(addr, {
@ -290,181 +296,192 @@ class newConn extends WebSocket {
} }
}); });
this.id = id;
this.addr = addr;
this.reconn_t = reconn_t;
this.isCache = relay_type(addr) === "cache_relay"; this.isCache = relay_type(addr) === "cache_relay";
this.isLoadBalancer = relay_type(addr) === "loadbalancer"; this.isLoadBalancer = relay_type(addr) === "loadbalancer";
this.ratelimit = 0; this.ratelimit = 0;
this.pendingNIP42 = new Set(); this.pendingNIP42 = new Set();
this.on('open', _ => {
if (!csess.hasOwnProperty(id)) return this.terminate();
const client = csess[id];
reconn_t = 0;
if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
if (!client) return; this.on('open', receiverOnOpen);
this.on('message', receiverOnMessage);
for (const i in client.subs) { this.on('error', receiverOnError);
this.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]])); this.on('close', receiverOnClose);
} this.on('unexpected-response', receiverOnUnexpectedResponse);
});
this.on('message', data => {
try {
data = JSON.parse(data);
} catch (error) {
return;
}
const client = csess[id];
if (!client) return;
switch (data[0]) {
case "EVENT": {
stats._global.raw_rx++;
stats[addr].raw_rx++;
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
if (!client.subalias.hasOwnProperty(data[1])) return;
data[1] = client.subalias[data[1]];
if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
if (!this.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
const filter = client.mergedFilters[data[1]];
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !this.isCache) return;
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
const filters = client.subs[data[1]];
if (!_matchFilters(filters, data[2])) return;
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
if (NotInSearchQuery) return;
if (!this.isLoadBalancer) client.events[data[1]].add(data[2]?.id);
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
if (max_known_events && client.events[data[1]].size > max_known_events)
client.events[data[1]].delete(client.events[data[1]].values().next().value);
stats._global.rx++;
stats[addr].rx++;
// Now count for REQ limit requested by client.
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted
if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || this.isLoadBalancer) return;
const limit = getFilterLimit(filter);
if (limit === Infinity) return;
if (client.events[data[1]].size >= limit) {
// Once reached to <filter.limit>, send EOSE to client.
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) });
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
client.pause_subs.add(data[1]);
} else {
delete client.pendingEOSE[data[1]];
}
}
break;
}
case "EOSE":
if (!client.subalias.hasOwnProperty(data[1])) return;
data[1] = client.subalias[data[1]];
if (!client.pendingEOSE.hasOwnProperty(data[1]) && !this.isLoadBalancer) return;
client.pendingEOSE[data[1]]++;
if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[id].size} connected relays.`);
if (!this.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[id].size)))) return;
if (this.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
delete client.pendingEOSE[data[1]];
if (client.pause_subs.has(data[1]) && !this.isLoadBalancer) {
client.pause_subs.delete(data[1]);
} else {
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
}
break;
case "AUTH":
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return this.pendingNIP42.add(data[1]);
nip42(this, client.pubkey, private_keys[client.pubkey], data[1]);
break;
case "NOTICE":
if (typeof(data[1]) !== "string") return;
if (data[1].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]);
stats._global.f++
stats[addr].f++
break;
case "CLOSED":
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
if (data[2].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]);
if (data[2].length) {
stats._global.f++;
stats[addr].f++;
}
if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++;
break;
case "OK":
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
if (data[3].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]);
switch (data[2]) {
case true:
stats._global.tx++;
stats[addr].tx++;
case false:
stats._global.f++
stats[addr].f++
}
break;
}
});
this.on('error', _ => {
if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString())
});
this.on('close', _ => {
if (!userRelays.hasOwnProperty(id)) return;
userRelays[id].delete(this);
if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
reconn_t += reconnect_time || 5000
setTimeout(_ => {
if (!csess.hasOwnProperty(id)) return;
const relay = new newConn(addr, id, reconn_t);
userRelays[id].add(relay);
}, reconn_t);
stats._global.f++
stats[addr].f++
});
this.on('unexpected-response', (req, res) => {
if (!userRelays.hasOwnProperty(id)) return;
userRelays[id].delete(this);
if (res.statusCode >= 500) return this.emit("close", null);
relays.splice(relays.indexOf(addr), 1);
cache_relays.splice(cache_relays.indexOf(addr), 1);
loadbalancer.splice(loadbalancer.indexOf(addr), 1);
console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
stats._global.f++
stats[addr].f++
});
} }
} }
function receiverOnOpen() {
if (!csess.hasOwnProperty(this.id)) return this.terminate();
const client = csess[this.id];
this.reconn_t = 0;
if (log_about_relays) console.log(threadId, "---", this.id, "Connected to", this.addr, `(${relay_type(this.addr)})`);
if (!client) return;
for (const i in client.subs) {
this.send(JSON.stringify(["REQ", client.fakesubalias[i], ...client.subs[i]]));
}
}
function receiverOnMessage(data) {
try {
data = JSON.parse(data);
} catch (error) {
return;
}
const client = csess[this.id];
if (!client) return;
switch (data[0]) {
case "EVENT": {
stats._global.raw_rx++;
stats[this.addr].raw_rx++;
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
if (!client.subalias.hasOwnProperty(data[1])) return;
data[1] = client.subalias[data[1]];
if (client.events[data[1]].has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
if (!this.isCache) bc(["EVENT", data[2]], this.id, true); // store to cache relay
const filter = client.mergedFilters[data[1]];
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !this.isCache) return;
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
const filters = client.subs[data[1]];
if (!_matchFilters(filters, data[2])) return;
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
if (NotInSearchQuery) return;
if (!this.isLoadBalancer) client.events[data[1]].add(data[2]?.id);
parentPort.postMessage({ type: "upstream_msg", id: this.id, data: JSON.stringify(data) });
if (max_known_events && client.events[data[1]].size > max_known_events)
client.events[data[1]].delete(client.events[data[1]].values().next().value);
stats._global.rx++;
stats[this.addr].rx++;
// Now count for REQ limit requested by client.
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted
if (!client.pendingEOSE.hasOwnProperty(data[1]) || client.pause_subs.has(data[1]) || this.isLoadBalancer) return;
const limit = getFilterLimit(filter);
if (limit === Infinity) return;
if (client.events[data[1]].size >= limit) {
// Once reached to <filter.limit>, send EOSE to client.
parentPort.postMessage({ type: "upstream_msg", id: this.id, data: JSON.stringify(["EOSE", data[1]]) });
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
client.pause_subs.add(data[1]);
} else {
delete client.pendingEOSE[data[1]];
}
}
break;
}
case "EOSE":
if (!client.subalias.hasOwnProperty(data[1])) return;
data[1] = client.subalias[data[1]];
if (!client.pendingEOSE.hasOwnProperty(data[1]) && !this.isLoadBalancer) return;
client.pendingEOSE[data[1]]++;
if (log_about_relays) console.log(threadId, "---", this.id, `got EOSE from ${this.addr} for ${data[1]}. There are ${client.pendingEOSE[data[1]]} EOSE received out of ${userRelays[this.id].size} connected relays.`);
if (!this.isCache && (wait_eose && ((client.pendingEOSE[data[1]] < max_eose_score) || (client.pendingEOSE[data[1]] < userRelays[this.id].size)))) return;
if (this.isCache && !client.events[data[1]].size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
delete client.pendingEOSE[data[1]];
if (client.pause_subs.has(data[1]) && !this.isLoadBalancer) {
client.pause_subs.delete(data[1]);
} else {
parentPort.postMessage({ type: "upstream_msg", id: this.id, data: JSON.stringify(data) });
}
break;
case "AUTH":
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return this.pendingNIP42.add(data[1]);
nip42(this, client.pubkey, private_keys[client.pubkey], data[1]);
break;
case "NOTICE":
if (typeof(data[1]) !== "string") return;
if (data[1].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, this.id, this.addr, data[0], data[1]);
stats._global.f++
stats[this.addr].f++
break;
case "CLOSED":
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
if (data[2].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, this.id, this.addr, data[0], data[1], data[2]);
if (data[2].length) {
stats._global.f++;
stats[this.addr].f++;
}
if (client.pendingEOSE.hasOwnProperty(data[1])) client.pendingEOSE[data[1]]++;
break;
case "OK":
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
if (data[3].startsWith("rate-limited")) this.ratelimit = Date.now();
if (log_about_relays) console.log(threadId, this.id, this.addr, data[0], data[1], data[2], data[3]);
switch (data[2]) {
case true:
stats._global.tx++;
stats[this.addr].tx++;
case false:
stats._global.f++
stats[this.addr].f++
}
break;
}
}
function receiverOnError(_) {
if (log_about_relays) return;
console.error(threadId, "-!-", this.id, this.addr, _.toString());
}
function receiverOnClose() {
if (!userRelays.hasOwnProperty(this.id)) return;
userRelays[this.id].delete(this);
if (log_about_relays) console.log(threadId, "-!-", this.id, "Disconnected from", this.addr, `(${relay_type(this.addr)})`);
this.reconn_t += reconnect_time || 5000;
setTimeout(_ => {
if (!csess.hasOwnProperty(this.id)) return;
const relay = new Session(this.addr, this.id, this.reconn_t);
userRelays[this.id].add(relay);
}, this.reconn_t);
stats._global.f++
stats[this.addr].f++
}
function receiverOnUnexpectedResponse(req, res) {
if (!userRelays.hasOwnProperty(this.id)) return;
userRelays[this.id].delete(this);
if (res.statusCode >= 500) return this.emit("close", null);
relays.splice(relays.indexOf(this.addr), 1);
cache_relays.splice(cache_relays.indexOf(this.addr), 1);
loadbalancer.splice(loadbalancer.indexOf(this.addr), 1);
console.log(threadId, "-!-", `${this.addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
stats._global.f++
stats[this.addr].f++
}
for (let i = 1; i <= (idle_sessions || 1); i++) { for (let i = 1; i <= (idle_sessions || 1); i++) {
newsess(); newsess();
} }