add feature for ratelimits.

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle 2024-01-23 21:12:06 +07:00
parent 120198d195
commit a3b4c20567
3 changed files with 43 additions and 9 deletions

View File

@ -3,7 +3,7 @@ const { verifySignature, validateEvent, nip19 } = require("nostr-tools");
const auth = require("./auth.js"); const auth = require("./auth.js");
const nip42 = require("./nip42.js"); const nip42 = require("./nip42.js");
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess, broadcast_ratelimit } = require("./config"); let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays, max_orphan_sess, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
const socks = new Set(); const socks = new Set();
const csess = new Map(); const csess = new Map();
@ -32,6 +32,7 @@ module.exports = (ws, req) => {
ws.pendingEOSE = new Map(); // each contain subID ws.pendingEOSE = new Map(); // each contain subID
ws.EOSETimeout = new Map(); // per subID ws.EOSETimeout = new Map(); // per subID
ws.reconnectTimeout = new Set(); // relays timeout() before reconnection. Only use after client disconnected. ws.reconnectTimeout = new Set(); // relays timeout() before reconnection. Only use after client disconnected.
ws.pubkey = null;
if (authorized_keys?.length) { if (authorized_keys?.length) {
authKey = Date.now() + Math.random().toString(36); authKey = Date.now() + Math.random().toString(36);
@ -57,7 +58,7 @@ module.exports = (ws, req) => {
JSON.stringify(["NOTICE", "error: bad JSON."]) JSON.stringify(["NOTICE", "error: bad JSON."])
) )
} }
console.log(data)
switch (data[0]) { switch (data[0]) {
case "EVENT": case "EVENT":
if (!authorized) return; if (!authorized) return;
@ -86,6 +87,7 @@ console.log(data)
if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: expected subID a string. but got the otherwise."])); if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: expected subID a string. but got the otherwise."]));
if (typeof(data[2]) !== "object") return ws.send(JSON.stringify(["CLOSED", data[1], "error: expected filter to be obj, instead gives the otherwise."])); if (typeof(data[2]) !== "object") return ws.send(JSON.stringify(["CLOSED", data[1], "error: expected filter to be obj, instead gives the otherwise."]));
if ((max_client_subs !== -1) && (ws.subs.size > max_client_subs)) return ws.send(JSON.stringify(["CLOSED", data[1], "rate-limited: too many subscriptions."]));
if (ws.subs.has(data[1])) { if (ws.subs.has(data[1])) {
direct_bc(["CLOSE", data[1]], ws.id); direct_bc(["CLOSE", data[1]], ws.id);
cache_bc(["CLOSE", data[1]], ws.id); cache_bc(["CLOSE", data[1]], ws.id);
@ -153,8 +155,8 @@ console.log(data)
onClientDisconnect(); onClientDisconnect();
// admin session must be destroyed quick. // sensitive session must not be preserved.
if (ws.pubkey) terminate_sess(ws.id); if (private_keys && (ws.pubkey in private_keys)) terminate_sess(ws.id);
}); });
if (authorized) { if (authorized) {
@ -203,6 +205,9 @@ function direct_bc(msg, id) {
if (cache_relays?.includes(sock.url)) continue; if (cache_relays?.includes(sock.url)) continue;
if (sock.id !== id) continue; if (sock.id !== id) continue;
if (sock.readyState >= 2) return socks.delete(sock); if (sock.readyState >= 2) return socks.delete(sock);
// skip the ratelimit after <config.upstream_ratelimit_expiration>
if ((upstream_ratelimit_expiration) > (Date.now() - sock.ratelimit)) continue;
sock.send(JSON.stringify(msg)); sock.send(JSON.stringify(msg));
} }
} }
@ -269,7 +274,7 @@ function clearOrphanSess(l) {
} }
// WS - Sessions // WS - Sessions
function newConn(addr, id) { function newConn(addr, id, reconn_t = 0) {
if (!csess.has(id)) return; if (!csess.has(id)) return;
const relay = new WebSocket(addr, { const relay = new WebSocket(addr, {
headers: { headers: {
@ -279,9 +284,11 @@ function newConn(addr, id) {
}); });
relay.id = id; relay.id = id;
relay.ratelimit = 0;
relay.on('open', _ => { relay.on('open', _ => {
const client = csess.get(id); const client = csess.get(id);
if (!csess.has(id)) return relay.terminate(); if (!csess.has(id)) return relay.terminate();
reconn_t = 0;
socks.add(relay); // Add this socket session to [socks] socks.add(relay); // Add this socket session to [socks]
if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`); if (log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}] ${relay.url} is connected ${!client ? "(orphan)" : ""}`);
@ -374,6 +381,17 @@ function newConn(addr, id) {
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return; if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]); nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
break; break;
case "NOTICE":
case "CLOSED":
if (typeof(data[1]) !== "string") return;
if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now();
break;
case "OK":
if (typeof(data[2]) !== "string") return;
if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now();
break;
} }
}); });
@ -387,10 +405,18 @@ function newConn(addr, id) {
if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url); if (log_about_relays) console.log(process.pid, "-!-", `[${id}] [${socks.size}/${relays.length*csess.size}]`, "Disconnected from", relay.url);
if (!csess.has(id)) return; if (!csess.has(id)) return;
reconn_t += reconnect_time || 5000
const reconnectTimeout = setTimeout(_ => { const reconnectTimeout = setTimeout(_ => {
newConn(addr, id); newConn(addr, id, reconn_t);
client?.reconnectTimeout.delete(reconnectTimeout); client?.reconnectTimeout.delete(reconnectTimeout);
}, reconnect_time || 5000); // As a bouncer server, We need to reconnect. }, reconn_t); // As a bouncer server, We need to reconnect.
client?.reconnectTimeout.add(reconnectTimeout); client?.reconnectTimeout.add(reconnectTimeout);
}); });
relay.on('unexpected-response', (req, res) => {
socks.delete(relay);
if (res.statusCode >= 500) return relay.emit("close", null);
delete relays[relays.indexOf(addr)];
console.log(process.pid, "-!-", `${relay.url} give status code ${res.statusCode}. Not (re)connect with new session again.`);
});
} }

View File

@ -26,6 +26,14 @@ module.exports = {
// Time before reconnect to relays in milliseconds. // Time before reconnect to relays in milliseconds.
reconnect_time: 5000, reconnect_time: 5000,
// Ratelimit expiration after ratelimit from upstream relay in miliseconds.
// Setting as 0 will disable ratelimit handling.
upstream_ratelimit_expiration: 10000,
// Maximum subscriptions that client could open.
// Setting as -1 will disable max subscription limit.
max_client_subs: -1,
// Maximum amount of orphan sessions. // Maximum amount of orphan sessions.
// Setting to 0 disables orphan session function. // Setting to 0 disables orphan session function.
max_orphan_sess: 7, max_orphan_sess: 7,

View File

@ -1,6 +1,6 @@
{ {
"name": "bostr", "name": "bostr",
"version": "2.0.3", "version": "2.0.4-dev",
"description": "Nostr relay bouncer", "description": "Nostr relay bouncer",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
@ -25,7 +25,7 @@
"homepage": "https://github.com/Yonle/bostr#readme", "homepage": "https://github.com/Yonle/bostr#readme",
"dependencies": { "dependencies": {
"nostr-tools": "^1.17.0", "nostr-tools": "^1.17.0",
"ws": "^8.14.2" "ws": "^8.16.0"
}, },
"engines": { "engines": {
"node": ">=16" "node": ">=16"