bostr/bouncer.js
Yonle 95bc83261a bouncer: remove eose_timeout
Signed-off-by: Yonle <yonle@lecturify.net>
2024-02-16 23:07:24 +07:00

357 lines
14 KiB
JavaScript

"use strict";
const { version } = require("./package.json");
const WebSocket = require("ws");
const { verifySignature, validateEvent, nip19 } = require("nostr-tools");
const auth = require("./auth.js");
const nip42 = require("./nip42.js");
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, cache_relays, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
const csess = new Map();
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
if (relays.length < max_eose_score) max_eose_score = 0;
cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/");
// CL - User socket
module.exports = (ws, req, onClose) => {
let authKey = null;
let authorized = true;
let lastEvent = Date.now();
let ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
ws.id = (process.pid + Math.floor(Math.random() * 1000) + "_" + csess.size);
ws.relays = new Set(); // Set() of connected relays.
ws.subs = new Map(); // contains filter submitted by clients. per subID
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
ws.my_events = new Set(); // for event retransmitting.
ws.pendingEOSE = new Map(); // each contain subID
ws.reconnectTimeout = new Set(); // relays timeout() before reconnection. Only use after client disconnected.
ws.subalias = new Map();
ws.fakesubalias = new Map();
ws.pubkey = null;
if (authorized_keys?.length) {
authKey = Date.now() + Math.random().toString(36);
authorized = false;
ws.send(JSON.stringify(["AUTH", authKey]));
} else if (private_keys !== {}) {
// If there is no whitelist, Then we ask to client what is their public key.
// We will enable NIP-42 function for this session if user pubkey was available & valid in <private_keys>.
// There is no need to limit this session. We only ask who is this user.
// If it was the users listed at <private_keys> in config.js, Then the user could use NIP-42 protected relays.
authKey = Date.now() + Math.random().toString(36);
ws.send(JSON.stringify(["AUTH", authKey]));
}
console.log(process.pid, `->- ${ip} (${ws.id}) connected [${req.headers["user-agent"] || ""}]`);
ws.on("message", data => {
try {
data = JSON.parse(data);
} catch {
return ws.send(
JSON.stringify(["NOTICE", "error: bad JSON."])
)
}
switch (data[0]) {
case "EVENT":
if (!authorized) return;
if (!validateEvent(data[1]) || !verifySignature(data[1])) return ws.send(JSON.stringify(["NOTICE", "error: invalid event"]));
if (data[1].kind == 22242) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: kind 22242"]));
if (
approved_publishers?.length &&
!approved_publishers?.includes(data[1].pubkey)
) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: unauthorized"]));
if (broadcast_ratelimit && (broadcast_ratelimit > (Date.now() - lastEvent))) {
lastEvent = Date.now();
return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rate-limited: request too fast."]));
}
lastEvent = Date.now();
ws.my_events.add(data[1]);
direct_bc(data, ws.id);
cache_bc(data, ws.id);
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
break;
case "REQ": {
if (!authorized) return;
if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: expected subID a string. but got the otherwise."]));
if (typeof(data[2]) !== "object") return ws.send(JSON.stringify(["CLOSED", data[1], "error: expected filter to be obj, instead gives the otherwise."]));
if ((max_client_subs !== -1) && (ws.subs.size > max_client_subs)) return ws.send(JSON.stringify(["CLOSED", data[1], "rate-limited: too many subscriptions."]));
if (ws.subs.has(data[1])) return ws.send(JSON.stringify(["CLOSED", data[1], "duplicate: subscription already opened"]));
const origID = data[1];
const faked = Date.now() + Math.random().toString(36);
ws.subs.set(origID, data.slice(2));
ws.events.set(origID, new Set());
ws.pause_subs.delete(origID);
ws.subalias.set(faked, origID);
ws.fakesubalias.set(origID, faked);
data[1] = faked;
bc(data, ws.id);
if (data[2]?.limit < 1) return ws.send(JSON.stringify(["EOSE", origID]));
ws.pendingEOSE.set(origID, 0);
break;
}
case "CLOSE":
if (!authorized) return;
if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
if (!ws.fakesubalias.has(data[1])) return ws.send(JSON.stringify(["CLOSED", data[1], "error: this sub is not opened."]));
const origID = data[1];
const faked = ws.fakesubalias.get(origID);
ws.subs.delete(origID);
ws.events.delete(origID);
ws.pendingEOSE.delete(origID);
ws.pause_subs.delete(origID);
ws.fakesubalias.delete(origID);
ws.subalias.delete(faked);
data[1] = faked;
cache_bc(data, ws.id);
direct_bc(data, ws.id);
ws.send(JSON.stringify(["CLOSED", origID, ""]));
break;
case "AUTH":
if (auth(authKey, data[1], ws, req)) {
ws.pubkey = data[1].pubkey;
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
if (authorized) return;
csess.set(ws.id, ws);
newsess(ws.id);
authorized = true;
lastEvent = Date.now();
}
break;
default:
ws.send(JSON.stringify(["NOTICE", "error: unrecognized command."]));
break;
}
});
ws.on('error', console.error);
ws.on('close', _ => {
onClose();
console.log(process.pid, "---", `${ip} (${ws.id}) disconnected`);
for (const i of ws.reconnectTimeout) {
clearTimeout(i);
// Let the garbage collector do the thing. No need to add ws.reconnectTimeout.delete(i);
}
terminate_sess(ws.id);
});
if (authorized) {
csess.set(ws.id, ws);
newsess(ws.id);
}
}
// WS - New session for client $id
function newsess(id) {
cache_relays?.forEach(_ => newConn(_, id));
relays.forEach(_ => newConn(_, id));
}
// WS - Broadcast message to every existing sockets
function direct_bc(msg, id) {
for (const sock of csess.get(id).relays) {
if (cache_relays?.includes(sock.url)) continue;
if (sock.readyState !== 1) continue;
// skip the ratelimit after <config.upstream_ratelimit_expiration>
if ((upstream_ratelimit_expiration) > (Date.now() - sock.ratelimit)) continue;
sock.send(JSON.stringify(msg));
}
}
function cache_bc(msg, id) {
for (const sock of csess.get(id).relays) {
if (!cache_relays?.includes(sock.url)) continue;
if (sock.readyState !== 1) continue;
sock.send(JSON.stringify(msg));
}
}
function bc(msg, id) {
if (!cache_relays?.length) direct_bc(msg, id);
else cache_bc(msg, id);
}
// WS - Terminate all existing sockets that were for <id>
function terminate_sess(id) {
for (const sock of csess.get(id).relays) {
sock.close();
}
csess.delete(id);
}
// WS - Sessions
function newConn(addr, id, reconn_t = 0) {
if (!csess.has(id)) return;
const relay = new WebSocket(addr, {
headers: {
"User-Agent": `Bostr (v${version}); The nostr relay bouncer; https://github.com/Yonle/bostr`
},
noDelay: true,
allowSynchronousEvents: true
});
relay.id = id;
relay.ratelimit = 0;
relay.on('open', _ => {
const client = csess.get(id);
if (!csess.has(id)) return relay.terminate();
reconn_t = 0;
if (log_about_relays) console.log(process.pid, "---", `[${id}] ${relay.url} is connected`);
if (!client) return;
for (const i of client.my_events) {
relay.send(JSON.stringify(["EVENT", i]));
}
for (const i of client.subs) {
relay.send(JSON.stringify(["REQ", client.fakesubalias.get(i[0]), ...i[1]]));
}
});
relay.on('message', data => {
const client = csess.get(id);
if (!client) return;
try {
data = JSON.parse(data);
} catch (error) {
return;
}
switch (data[0]) {
case "EVENT": {
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
if (!client.subalias.has(data[1])) return;
data[1] = client.subalias.get(data[1]);
if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
// if filter.since > receivedEvent.created_at, skip
// if receivedEvent.created_at > filter.until, skip
const cFilter = client.subs.get(data[1])[0];
if (Array.isArray(cFilter?.ids) && !cFilter.ids.includes(data[2].id)) return;
if (Array.isArray(cFilter?.authors) && !cFilter.authors.includes(data[2].pubkey)) return;
if (Array.isArray(cFilter?.kinds) && !cFilter.kinds.includes(data[2].kind)) return;
if (cFilter?.since > data[2].created_at) return;
if (data[2].created_at > cFilter?.until) return;
const NotInSearchQuery = "search" in cFilter && !data[2]?.content?.toLowerCase().includes(cFilter.search.toLowerCase());
if (NotInSearchQuery) return;
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
if (!client.pause_subs.has(data[1])) {
client.events.get(data[1]).add(data[2]?.id);
client.send(JSON.stringify(data));
}
// send into cache relays.
if (!cache_relays?.includes(relay.url)) cache_bc(["EVENT", data[2]], id);
// Now count for REQ limit requested by client.
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted
if (!client.pendingEOSE.has(data[1]) || !(cFilter?.limit || cFilter?.ids?.length) || client.pause_subs.has(data[1])) return;
if (client.events.get(data[1]).size >= (cFilter?.ids?.length || cFilter?.limit)) {
// Once reached to <filter.limit>, send EOSE to client.
client.send(JSON.stringify(["EOSE", data[1]]));
if (pause_on_limit || cache_relays?.includes(relay.url)) {
client.pause_subs.add(data[1]);
} else {
client.pendingEOSE.delete(data[1]);
}
}
break;
}
case "EOSE":
if (!client.subalias.has(data[1])) return;
data[1] = client.subalias.get(data[1]);
if (!client.pendingEOSE.has(data[1])) return;
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
if (!cache_relays?.includes(relay.url)) {
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
} else {
if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return;
// get the filter
const filter = client.subs.get(data[1]);
if (client.pause_subs.has(data[1])) {
client.pause_subs.delete(data[1]);
client.pendingEOSE.delete(data[1]);
}
// now req to the direct connection, with the recent one please.
return direct_bc(["REQ", data[1], ...filter], id);
}
client.send(JSON.stringify(data));
break;
case "AUTH":
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
break;
case "NOTICE":
case "CLOSED":
if (typeof(data[1]) !== "string") return;
if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now();
break;
case "OK":
if (typeof(data[2]) !== "string") return;
if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now();
break;
}
});
relay.on('error', _ => {
if (log_about_relays) console.error(process.pid, "-!-", `[${id}]`, relay.url, _.toString())
});
relay.on('close', _ => {
const client = csess.get(id);
if (!client) return;
client.relays.delete(relay); // Remove this socket session from <client.relays> list
if (log_about_relays) console.log(process.pid, "-!-", `[${id}]`, "Disconnected from", relay.url);
reconn_t += reconnect_time || 5000
const reconnectTimeout = setTimeout(_ => {
newConn(addr, id, reconn_t);
client?.reconnectTimeout.delete(reconnectTimeout);
}, reconn_t); // As a bouncer server, We need to reconnect.
client?.reconnectTimeout.add(reconnectTimeout);
});
relay.on('unexpected-response', (req, res) => {
const client = csess.get(id);
if (!client) return;
client.relays.delete(relay);
if (res.statusCode >= 500) return relay.emit("close", null);
delete relays[relays.indexOf(addr)];
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
});
csess.get(id).relays.add(relay); // Add this socket session to <client.relays>
}