mirror of
https://github.com/Yonle/bostr.git
synced 2025-10-10 16:42:34 +02:00
bouncer: split code into two separate processes with upstream comms to be a worker
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
441
bouncer.js
441
bouncer.js
@@ -1,10 +1,11 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
|
// Anything about handling client connections is in this code.
|
||||||
|
|
||||||
|
const { Worker } = require("worker_threads");
|
||||||
const { version } = require("./package.json");
|
const { version } = require("./package.json");
|
||||||
const WebSocket = require("ws");
|
|
||||||
const querystring = require("querystring");
|
const querystring = require("querystring");
|
||||||
const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = require("nostr-tools");
|
const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = require("nostr-tools");
|
||||||
const auth = require("./auth.js");
|
const auth = require("./auth.js");
|
||||||
const nip42 = require("./nip42.js");
|
|
||||||
|
|
||||||
let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper, loadbalancer } = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper, loadbalancer } = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
||||||
|
|
||||||
@@ -28,18 +29,19 @@ if (approved_publishers?.length) {
|
|||||||
console.warn(process.pid, "[config]");
|
console.warn(process.pid, "[config]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const worker = new Worker(__dirname + "/worker_bouncer.js", { name: "Bostr (worker)" });
|
||||||
|
|
||||||
const csess = new Map(); // this is used for relays.
|
const csess = new Map(); // this is used for relays.
|
||||||
const userRelays = new Map(); // per ID contains Set() of <WebSocket>
|
const userRelays = new Map(); // per ID contains Set() of <WebSocket>
|
||||||
const idleSess = new Set();
|
const ident = new Map();
|
||||||
|
|
||||||
let stats = {
|
let zeroStats = {
|
||||||
_global: {
|
|
||||||
raw_rx: 0,
|
raw_rx: 0,
|
||||||
rx: 0,
|
rx: 0,
|
||||||
tx: 0,
|
tx: 0,
|
||||||
f: 0
|
f: 0
|
||||||
}
|
}
|
||||||
};
|
let stats = {};
|
||||||
|
|
||||||
// CL - User socket
|
// CL - User socket
|
||||||
function handleConnection(ws, req, onClose) {
|
function handleConnection(ws, req, onClose) {
|
||||||
@@ -48,16 +50,10 @@ function handleConnection(ws, req, onClose) {
|
|||||||
let authorized = true;
|
let authorized = true;
|
||||||
let sessStarted = false;
|
let sessStarted = false;
|
||||||
let lastEvent = Date.now();
|
let lastEvent = Date.now();
|
||||||
|
ws.onready = null;
|
||||||
|
ws.ident = Date.now() + Math.random().toString(36);
|
||||||
|
ws.id = null;
|
||||||
ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
|
ws.ip = req.headers["x-forwarded-for"]?.split(",")[0] || req.socket.address()?.address;
|
||||||
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
|
||||||
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
|
||||||
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
|
||||||
ws.my_events = new Set(); // for event retransmitting.
|
|
||||||
ws.pendingEOSE = new Map(); // each contain subID
|
|
||||||
ws.reconnectTimeout = new Set(); // relays timeout() before reconnection. Only use after client disconnected.
|
|
||||||
ws.subalias = new Map();
|
|
||||||
ws.fakesubalias = new Map();
|
|
||||||
ws.mergedFilters = new Map();
|
|
||||||
ws.pubkey = null;
|
ws.pubkey = null;
|
||||||
ws.rejectKinds = query.reject?.split(",").map(_ => parseInt(_));
|
ws.rejectKinds = query.reject?.split(",").map(_ => parseInt(_));
|
||||||
ws.acceptKinds = query.accept?.split(",").map(_ => parseInt(_));
|
ws.acceptKinds = query.accept?.split(",").map(_ => parseInt(_));
|
||||||
@@ -65,6 +61,8 @@ function handleConnection(ws, req, onClose) {
|
|||||||
ws.accurateMode = parseInt(query.accurate);
|
ws.accurateMode = parseInt(query.accurate);
|
||||||
ws.saveMode = parseInt(query.save);
|
ws.saveMode = parseInt(query.save);
|
||||||
|
|
||||||
|
ident.set(ws.ident, ws);
|
||||||
|
|
||||||
if (noscraper || authorized_keys?.length) {
|
if (noscraper || authorized_keys?.length) {
|
||||||
authKey = Date.now() + Math.random().toString(36);
|
authKey = Date.now() + Math.random().toString(36);
|
||||||
authorized = false;
|
authorized = false;
|
||||||
@@ -81,7 +79,7 @@ function handleConnection(ws, req, onClose) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
console.log(process.pid, `->- ${ws.ip} connected [${req.headers["user-agent"] || ""}]`);
|
console.log(process.pid, `->- ${ws.ip} connected [${req.headers["user-agent"] || ""}]`);
|
||||||
ws.on("message", data => {
|
ws.on("message", async (data) => {
|
||||||
try {
|
try {
|
||||||
data = JSON.parse(data);
|
data = JSON.parse(data);
|
||||||
} catch {
|
} catch {
|
||||||
@@ -111,16 +109,13 @@ function handleConnection(ws, req, onClose) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
lastEvent = Date.now();
|
lastEvent = Date.now();
|
||||||
ws.my_events.add(data[1]);
|
|
||||||
|
|
||||||
if (!sessStarted) {
|
if (!sessStarted) {
|
||||||
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
|
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
|
||||||
getIdleSess(ws);
|
await getIdleSess(ws);
|
||||||
sessStarted = true;
|
sessStarted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bc(data, ws.id);
|
_event(ws.id, data[1]);
|
||||||
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
|
|
||||||
break;
|
break;
|
||||||
case "REQ": {
|
case "REQ": {
|
||||||
if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
|
if (data.length < 3) return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
|
||||||
@@ -130,90 +125,27 @@ function handleConnection(ws, req, onClose) {
|
|||||||
ws.send(JSON.stringify(["CLOSED", data[1], "auth-required: authentication is required to perform this action."]));
|
ws.send(JSON.stringify(["CLOSED", data[1], "auth-required: authentication is required to perform this action."]));
|
||||||
return ws.send(JSON.stringify(["AUTH", authKey]));
|
return ws.send(JSON.stringify(["AUTH", authKey]));
|
||||||
}
|
}
|
||||||
if ((max_client_subs !== -1) && (ws.subs.size > max_client_subs)) return ws.send(JSON.stringify(["CLOSED", data[1], "rate-limited: too many subscriptions."]));
|
|
||||||
const origID = data[1];
|
|
||||||
if (ws.subs.has(data[1])) {
|
|
||||||
const faked = ws.fakesubalias.get(origID);
|
|
||||||
ws.subs.delete(origID);
|
|
||||||
ws.events.delete(origID);
|
|
||||||
ws.pendingEOSE.delete(origID);
|
|
||||||
ws.pause_subs.delete(origID);
|
|
||||||
ws.fakesubalias.delete(origID);
|
|
||||||
ws.subalias.delete(faked);
|
|
||||||
ws.mergedFilters.delete(origID);
|
|
||||||
bc(["CLOSE", faked], ws.id);
|
|
||||||
};
|
|
||||||
|
|
||||||
const faked = Date.now() + Math.random().toString(36);
|
|
||||||
let filters = data.slice(2);
|
|
||||||
let filter = mergeFilters(...filters);
|
|
||||||
|
|
||||||
for (const fn in filters) {
|
|
||||||
if (!Array.isArray(filters[fn].kinds)) {
|
|
||||||
filters[fn].kinds = ws.acceptKinds;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
filters[fn].kinds = filters[fn].kinds?.filter(kind => {
|
|
||||||
if (ws.rejectKinds && ws.rejectKinds.includes(kind)) return false;
|
|
||||||
if (ws.acceptKinds && !ws.acceptKinds.includes(kind)) return false;
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (filters[fn].limit > ws.forcedLimit)
|
|
||||||
filters[fn].limit = ws.forcedLimit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!sessStarted) {
|
if (!sessStarted) {
|
||||||
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
|
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
|
||||||
getIdleSess(ws);
|
await getIdleSess(ws);
|
||||||
sessStarted = true;
|
sessStarted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.subs.set(origID, filters);
|
_req(ws.id, data[1], data.slice(2));
|
||||||
ws.events.set(origID, new Set());
|
|
||||||
ws.pause_subs.delete(origID);
|
|
||||||
ws.subalias.set(faked, origID);
|
|
||||||
ws.fakesubalias.set(origID, faked);
|
|
||||||
if (!filter.since) filter.since = Math.floor(Date.now() / 1000); // Will not impact everything. Only used for handling passing pause_on_limit (or save mode)
|
|
||||||
ws.mergedFilters.set(origID, filter);
|
|
||||||
data[1] = faked;
|
|
||||||
bc(data, ws.id);
|
|
||||||
if (filter.limit < 1) return ws.send(JSON.stringify(["EOSE", origID]));
|
|
||||||
ws.pendingEOSE.set(origID, 0);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "CLOSE":
|
case "CLOSE":
|
||||||
if (!authorized) return;
|
if (!authorized) return;
|
||||||
if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
|
if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."]));
|
||||||
if (!ws.subs.has(data[1])) return ws.send(JSON.stringify(["CLOSED", data[1], "error: this sub is not opened."]));
|
_close(ws.id, data[1]);
|
||||||
const origID = data[1];
|
|
||||||
const faked = ws.fakesubalias.get(origID);
|
|
||||||
ws.subs.delete(origID);
|
|
||||||
ws.events.delete(origID);
|
|
||||||
ws.pendingEOSE.delete(origID);
|
|
||||||
ws.pause_subs.delete(origID);
|
|
||||||
ws.fakesubalias.delete(origID);
|
|
||||||
ws.subalias.delete(faked);
|
|
||||||
ws.mergedFilters.delete(origID);
|
|
||||||
|
|
||||||
data[1] = faked;
|
|
||||||
bc(data, ws.id);
|
|
||||||
ws.send(JSON.stringify(["CLOSED", origID, ""]));
|
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
if (auth(authKey, data[1], ws, req)) {
|
if (auth(authKey, data[1], ws, req)) {
|
||||||
authKey = Date.now() + Math.random().toString(36);
|
authKey = Date.now() + Math.random().toString(36);
|
||||||
ws.pubkey = data[1].pubkey;
|
ws.pubkey = data[1].pubkey;
|
||||||
console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
||||||
if (private_keys[ws.pubkey]) {
|
_auth(ws.id, ws.pubkey);
|
||||||
for (const relay of userRelays.get(ws.id)) {
|
|
||||||
for (const challenge of relay.pendingNIP42) {
|
|
||||||
nip42(relay, client.pubkey, private_keys[ws.pubkey], challenge);
|
|
||||||
relay.pendingNIP42.delete(challenge);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (authorized) return;
|
if (authorized) return;
|
||||||
authorized = true;
|
authorized = true;
|
||||||
lastEvent = Date.now();
|
lastEvent = Date.now();
|
||||||
@@ -229,285 +161,114 @@ function handleConnection(ws, req, onClose) {
|
|||||||
ws.on('close', _ => {
|
ws.on('close', _ => {
|
||||||
onClose();
|
onClose();
|
||||||
|
|
||||||
|
ident.delete(ws.ident);
|
||||||
|
|
||||||
console.log(process.pid, "---", `${ws.ip} disconnected`);
|
console.log(process.pid, "---", `${ws.ip} disconnected`);
|
||||||
|
|
||||||
if (!sessStarted) return;
|
if (!sessStarted) return;
|
||||||
for (const sock of userRelays.get(ws.id)) {
|
_destroy(ws.id);
|
||||||
sock.terminate();
|
|
||||||
}
|
|
||||||
|
|
||||||
userRelays.delete(ws.id);
|
|
||||||
csess.delete(ws.id);
|
csess.delete(ws.id);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS - New session for client $id
|
function handleWorker(msg) {
|
||||||
// mostly for new idle session.
|
switch (msg.type) {
|
||||||
function newsess() {
|
case "sessreg": {
|
||||||
const id = Date.now() + "_" + process.pid + "_" + Math.random();
|
if (!ident.has(msg.ident)) return _destroy(msg.id);
|
||||||
const shift = loadbalancer.shift();
|
const ws = ident.get(msg.ident);
|
||||||
loadbalancer.push(shift);
|
ws.id = msg.id;
|
||||||
|
ws.onready();
|
||||||
userRelays.set(id, new Set());
|
csess.set(msg.id, ws);
|
||||||
csess.set(id, null);
|
ident.delete(msg.ident);
|
||||||
idleSess.add(id);
|
|
||||||
|
|
||||||
if (cache_relays) {
|
|
||||||
for (const url of cache_relays) {
|
|
||||||
newConn(url, id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (shift) {
|
|
||||||
case "_me":
|
|
||||||
for (const url of relays) {
|
|
||||||
newConn(url, id);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
newConn(shift, id);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case "upstream_msg":
|
||||||
|
if (!csess.has(msg.id)) return;
|
||||||
|
csess.get(msg.id).send(msg.data);
|
||||||
|
break;
|
||||||
|
case "stats":
|
||||||
|
stats = msg.data;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
function bc(msg, id, toCacheOnly) {
|
function _req(id, sid, filters) {
|
||||||
for (const relay of userRelays.get(id)) {
|
worker.postMessage({
|
||||||
if (relay.readyState !== 1) continue;
|
type: "req",
|
||||||
if (toCacheOnly && !relay.isCache) continue;
|
id,
|
||||||
|
sid,
|
||||||
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
filters
|
||||||
if ((upstream_ratelimit_expiration) > (Date.now() - relay.ratelimit)) continue;
|
});
|
||||||
|
|
||||||
relay.send(JSON.stringify(msg));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function _close(id, sid) {
|
||||||
|
worker.postMessage({
|
||||||
|
type: "close",
|
||||||
|
id,
|
||||||
|
sid
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function _event(id, eventBlob) {
|
||||||
|
worker.postMessage({
|
||||||
|
type: "event",
|
||||||
|
id,
|
||||||
|
event: eventBlob
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function _auth(id, pubkey) {
|
||||||
|
worker.postMessage({
|
||||||
|
type: "auth",
|
||||||
|
id,
|
||||||
|
pubkey
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function _destroy(id) {
|
||||||
|
worker.postMessage({
|
||||||
|
type: "destroy",
|
||||||
|
id
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function getIdleSess(ws) {
|
function getIdleSess(ws) {
|
||||||
ws.id = idleSess.values().next().value;
|
const data = {
|
||||||
idleSess.delete(ws.id);
|
ip: ws.ip,
|
||||||
csess.set(ws.id, ws);
|
pubkey: ws.pubkey,
|
||||||
|
rejectKinds: ws.rejectKinds,
|
||||||
|
acceptKinds: ws.acceptKinds,
|
||||||
|
forcedLimit: ws.forcedLimit,
|
||||||
|
accurateMode: ws.accurateMode,
|
||||||
|
saveMode: ws.saveMode
|
||||||
|
};
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, "---", ws.ip, "is now using session", ws.id);
|
worker.postMessage({
|
||||||
|
type: "getsess",
|
||||||
newsess();
|
ident: ws.ident,
|
||||||
}
|
data
|
||||||
|
|
||||||
function _matchFilters(filters, event) {
|
|
||||||
// nostr-tools being randomly throw error in their own code. Put safety.
|
|
||||||
try {
|
|
||||||
return matchFilters(filters, event);
|
|
||||||
} catch {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function relay_type(addr) {
|
|
||||||
switch (true) {
|
|
||||||
case relays.includes(addr):
|
|
||||||
return "relay";
|
|
||||||
break;
|
|
||||||
case cache_relays.includes(addr):
|
|
||||||
return "cache_relay";
|
|
||||||
break;
|
|
||||||
case loadbalancer.includes(addr):
|
|
||||||
return "loadbalancer";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WS - Sessions
|
|
||||||
function newConn(addr, id, reconn_t = 0) {
|
|
||||||
if (!csess.has(id)) return;
|
|
||||||
if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 };
|
|
||||||
const relay = new WebSocket(addr, {
|
|
||||||
headers: {
|
|
||||||
"User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr`,
|
|
||||||
},
|
|
||||||
noDelay: true,
|
|
||||||
allowSynchronousEvents: true
|
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.isCache = relay_type(addr) === "cache_relay";
|
return new Promise(resolve => ws.onready = resolve);
|
||||||
relay.isLoadBalancer = relay_type(addr) === "loadbalancer";
|
|
||||||
relay.ratelimit = 0;
|
|
||||||
relay.pendingNIP42 = new Set();
|
|
||||||
relay.on('open', _ => {
|
|
||||||
if (!csess.has(id)) return relay.terminate();
|
|
||||||
const client = csess.get(id);
|
|
||||||
reconn_t = 0;
|
|
||||||
if (log_about_relays) console.log(process.pid, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
|
|
||||||
|
|
||||||
if (!client) return;
|
|
||||||
for (const i of client.my_events) {
|
|
||||||
relay.send(JSON.stringify(["EVENT", i]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const i of client.subs) {
|
worker.on("message", handleWorker);
|
||||||
relay.send(JSON.stringify(["REQ", client.fakesubalias.get(i[0]), ...i[1]]));
|
worker.on("error", err => {
|
||||||
}
|
console.error("\n***");
|
||||||
|
console.error("*** PANIC - Worker Process Error");
|
||||||
|
console.error(err);
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.on('message', data => {
|
worker.on("exit", _ => {
|
||||||
if (!csess.has(id)) return relay.terminate();
|
console.error("*** PANIC - End of Panic. Not doing anything.");
|
||||||
try {
|
console.error("***\n");
|
||||||
data = JSON.parse(data);
|
process.exit(6);
|
||||||
} catch (error) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const client = csess.get(id);
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
switch (data[0]) {
|
|
||||||
case "EVENT": {
|
|
||||||
stats._global.raw_rx++;
|
|
||||||
stats[addr].raw_rx++;
|
|
||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
|
||||||
if (!client.subalias.has(data[1])) return;
|
|
||||||
data[1] = client.subalias.get(data[1]);
|
|
||||||
|
|
||||||
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
|
||||||
if (!relay.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
|
|
||||||
const filter = client.mergedFilters.get(data[1]);
|
|
||||||
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !relay.isCache) return;
|
|
||||||
|
|
||||||
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
|
||||||
|
|
||||||
const filters = client.subs.get(data[1]);
|
|
||||||
if (!_matchFilters(filters, data[2])) return;
|
|
||||||
|
|
||||||
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
|
|
||||||
if (NotInSearchQuery) return;
|
|
||||||
|
|
||||||
if (!relay.isLoadBalancer) client.events.get(data[1]).add(data[2]?.id);
|
|
||||||
client.send(JSON.stringify(data));
|
|
||||||
|
|
||||||
stats._global.rx++;
|
|
||||||
stats[addr].rx++;
|
|
||||||
|
|
||||||
// Now count for REQ limit requested by client.
|
|
||||||
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
|
||||||
|
|
||||||
// Skip if EOSE has been omitted
|
|
||||||
if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return;
|
|
||||||
const limit = getFilterLimit(filter);
|
|
||||||
if (limit === Infinity) return;
|
|
||||||
if (client.events.get(data[1]).size >= limit) {
|
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
|
||||||
client.send(JSON.stringify(["EOSE", data[1]]));
|
|
||||||
|
|
||||||
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
|
||||||
client.pause_subs.add(data[1]);
|
|
||||||
} else {
|
|
||||||
client.pendingEOSE.delete(data[1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "EOSE":
|
|
||||||
if (!client.subalias.has(data[1])) return;
|
|
||||||
data[1] = client.subalias.get(data[1]);
|
|
||||||
if (!client.pendingEOSE.has(data[1]) && !relay.isLoadBalancer) return;
|
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
|
|
||||||
|
|
||||||
if (!relay.isCache && (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size)))) return;
|
|
||||||
if (relay.isCache && !client.events.get(data[1]).size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
|
|
||||||
client.pendingEOSE.delete(data[1]);
|
|
||||||
|
|
||||||
if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) {
|
|
||||||
client.pause_subs.delete(data[1]);
|
|
||||||
} else {
|
|
||||||
client.send(JSON.stringify(data));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case "AUTH":
|
|
||||||
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return relay.pendingNIP42.add(data[1]);
|
|
||||||
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "NOTICE":
|
|
||||||
if (typeof(data[1]) !== "string") return;
|
|
||||||
if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, id, addr, data[0], data[1]);
|
|
||||||
|
|
||||||
stats._global.f++
|
|
||||||
stats[addr].f++
|
|
||||||
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "CLOSED":
|
|
||||||
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
|
|
||||||
if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, id, addr, data[0], data[1], data[2]);
|
|
||||||
|
|
||||||
if (data[2].length) {
|
|
||||||
stats._global.f++;
|
|
||||||
stats[addr].f++;
|
|
||||||
}
|
|
||||||
if (client.pendingEOSE.has(data[1])) client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "OK":
|
|
||||||
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
|
|
||||||
if (data[3].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, id, addr, data[0], data[1], data[2], data[3]);
|
|
||||||
|
|
||||||
switch (data[2]) {
|
|
||||||
case true:
|
|
||||||
stats._global.tx++;
|
|
||||||
stats[addr].tx++;
|
|
||||||
case false:
|
|
||||||
stats._global.f++
|
|
||||||
stats[addr].f++
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
relay.on('error', _ => {
|
|
||||||
if (log_about_relays) console.error(process.pid, "-!-", id, addr, _.toString())
|
|
||||||
});
|
|
||||||
|
|
||||||
relay.on('close', _ => {
|
|
||||||
if (!userRelays.has(id)) return;
|
|
||||||
userRelays.get(id).delete(relay); // Remove this socket session from <client.relays> list
|
|
||||||
if (log_about_relays) console.log(process.pid, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
|
|
||||||
reconn_t += reconnect_time || 5000
|
|
||||||
setTimeout(_ => {
|
|
||||||
newConn(addr, id, reconn_t);
|
|
||||||
}, reconn_t);
|
|
||||||
|
|
||||||
stats._global.f++
|
|
||||||
stats[addr].f++
|
|
||||||
});
|
|
||||||
|
|
||||||
relay.on('unexpected-response', (req, res) => {
|
|
||||||
if (!userRelays.has(id)) return;
|
|
||||||
userRelays.get(id).delete(relay);
|
|
||||||
if (res.statusCode >= 500) return relay.emit("close", null);
|
|
||||||
relays = relays.filter(_ => _ != addr);
|
|
||||||
console.log(process.pid, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
|
||||||
|
|
||||||
stats._global.f++
|
|
||||||
stats[addr].f++
|
|
||||||
});
|
|
||||||
|
|
||||||
userRelays.get(id).add(relay); // Add this socket session to <client.relays>
|
|
||||||
}
|
|
||||||
|
|
||||||
for (let i = 1; i <= (idle_sessions || 1); i++) {
|
|
||||||
newsess();
|
|
||||||
}
|
|
||||||
|
|
||||||
function getStat(n) {
|
function getStat(n) {
|
||||||
if (!n) return stats;
|
if (!n) return stats;
|
||||||
return stats[n];
|
return stats[n] || zeroStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
3
http.js
3
http.js
@@ -1,4 +1,7 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
|
process.title = "Bostr (cluster)";
|
||||||
|
|
||||||
const { version } = require("./package.json");
|
const { version } = require("./package.json");
|
||||||
const WebSocket = require("ws");
|
const WebSocket = require("ws");
|
||||||
const http = require("http");
|
const http = require("http");
|
||||||
|
3
index.js
3
index.js
@@ -1,4 +1,7 @@
|
|||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
|
process.title = "Bostr (main)";
|
||||||
|
|
||||||
const config = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
const config = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
||||||
const cluster = require("cluster");
|
const cluster = require("cluster");
|
||||||
const fs = require("fs");
|
const fs = require("fs");
|
||||||
|
468
worker_bouncer.js
Normal file
468
worker_bouncer.js
Normal file
@@ -0,0 +1,468 @@
|
|||||||
|
"use strict";
|
||||||
|
// Anything about talking to upstream relays is handled here.
|
||||||
|
|
||||||
|
const { parentPort, threadId } = require("worker_threads");
|
||||||
|
const { version } = require("./package.json");
|
||||||
|
const WebSocket = require("ws");
|
||||||
|
const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = require("nostr-tools");
|
||||||
|
const nip42 = require("./nip42.js");
|
||||||
|
|
||||||
|
let { relays, allowed_publishers, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays, noscraper, loadbalancer } = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
||||||
|
|
||||||
|
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
||||||
|
|
||||||
|
loadbalancer = loadbalancer || [];
|
||||||
|
if (relays.length) loadbalancer.unshift("_me");
|
||||||
|
|
||||||
|
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
|
||||||
|
if (relays.length < max_eose_score) max_eose_score = 0;
|
||||||
|
|
||||||
|
const csess = new Map(); // this is used for relays.
|
||||||
|
const userRelays = new Map(); // per ID contains Set() of <WebSocket>
|
||||||
|
const idleSess = new Set();
|
||||||
|
|
||||||
|
let stats = {
|
||||||
|
_global: {
|
||||||
|
raw_rx: 0,
|
||||||
|
rx: 0,
|
||||||
|
tx: 0,
|
||||||
|
f: 0
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
parentPort.on('message', m => {
|
||||||
|
switch (m.type) {
|
||||||
|
case "getsess":
|
||||||
|
// [<ident>, <user info>]
|
||||||
|
getIdleSess(m.ident, m.data);
|
||||||
|
break;
|
||||||
|
case "req": {
|
||||||
|
if (!csess.has(m.id)) return;
|
||||||
|
const ws = csess.get(m.id);
|
||||||
|
|
||||||
|
if ((max_client_subs !== -1) && (ws.subs.size > max_client_subs))
|
||||||
|
return parentPort.postMessage({
|
||||||
|
type: "upstream_msg",
|
||||||
|
id: m.id,
|
||||||
|
data: JSON.stringify(["CLOSED", data[1], "rate-limited: too many subscriptions."])
|
||||||
|
});
|
||||||
|
const origID = m.sid;
|
||||||
|
if (ws.fakesubalias.has(origID)) {
|
||||||
|
const faked = ws.fakesubalias.get(origID);
|
||||||
|
ws.subs.delete(origID);
|
||||||
|
ws.events.delete(origID);
|
||||||
|
ws.pendingEOSE.delete(origID);
|
||||||
|
ws.pause_subs.delete(origID);
|
||||||
|
ws.fakesubalias.delete(origID);
|
||||||
|
ws.subalias.delete(faked);
|
||||||
|
ws.mergedFilters.delete(origID);
|
||||||
|
bc(["CLOSE", faked], m.id);
|
||||||
|
};
|
||||||
|
|
||||||
|
const faked = Date.now() + Math.random().toString(36);
|
||||||
|
let filters = m.filters;
|
||||||
|
let filter = mergeFilters(...filters);
|
||||||
|
|
||||||
|
for (const fn in filters) {
|
||||||
|
if (!Array.isArray(filters[fn].kinds)) {
|
||||||
|
filters[fn].kinds = ws.acceptKinds;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
filters[fn].kinds = filters[fn].kinds?.filter(kind => {
|
||||||
|
if (ws.rejectKinds && ws.rejectKinds.includes(kind)) return false;
|
||||||
|
if (ws.acceptKinds && !ws.acceptKinds.includes(kind)) return false;
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filters[fn].limit > ws.forcedLimit)
|
||||||
|
filters[fn].limit = ws.forcedLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.subs.set(origID, filters);
|
||||||
|
ws.events.set(origID, new Set());
|
||||||
|
ws.pause_subs.delete(origID);
|
||||||
|
ws.subalias.set(faked, origID);
|
||||||
|
ws.fakesubalias.set(origID, faked);
|
||||||
|
if (!filter.since) filter.since = Math.floor(Date.now() / 1000); // Will not impact everything. Only used for handling passing pause_on_limit (or save mode)
|
||||||
|
ws.mergedFilters.set(origID, filter);
|
||||||
|
bc(["REQ", faked, ...filters], m.id);
|
||||||
|
if (filter.limit < 1) return parentPort.postMessage({
|
||||||
|
type: "upstream_msg",
|
||||||
|
id: m.id,
|
||||||
|
data: JSON.stringify(["EOSE", origID])
|
||||||
|
});
|
||||||
|
ws.pendingEOSE.set(origID, 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "close": {
|
||||||
|
if (!csess.has(m.id)) return;
|
||||||
|
const ws = csess.get(m.id);
|
||||||
|
if (!ws.fakesubalias.has(m.sid)) return;
|
||||||
|
|
||||||
|
const origID = m.sid;
|
||||||
|
const faked = ws.fakesubalias.get(origID);
|
||||||
|
ws.subs.delete(origID);
|
||||||
|
ws.events.delete(origID);
|
||||||
|
ws.pendingEOSE.delete(origID);
|
||||||
|
ws.pause_subs.delete(origID);
|
||||||
|
ws.fakesubalias.delete(origID);
|
||||||
|
ws.subalias.delete(faked);
|
||||||
|
ws.mergedFilters.delete(origID);
|
||||||
|
|
||||||
|
bc(["CLOSE", faked], m.id);
|
||||||
|
parentPort.postMessage({
|
||||||
|
type: "upstream_msg",
|
||||||
|
id: m.id,
|
||||||
|
data: JSON.stringify(["CLOSED", origID, ""])
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "event": {
|
||||||
|
if (!csess.has(m.id)) return;
|
||||||
|
const ws = csess.get(m.id);
|
||||||
|
|
||||||
|
ws.my_events.add(m.event.id);
|
||||||
|
|
||||||
|
bc(["EVENT", m.event], m.id);
|
||||||
|
parentPort.postMessage({
|
||||||
|
type: "upstream_msg",
|
||||||
|
id: m.id,
|
||||||
|
data: JSON.stringify(["OK", m.event.id, true, ""])
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "destroy":
|
||||||
|
if (!csess.has(m.id)) return;
|
||||||
|
|
||||||
|
for (const sock of userRelays.get(m.id)) {
|
||||||
|
sock.terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
userRelays.delete(m.id);
|
||||||
|
csess.delete(m.id);
|
||||||
|
break;
|
||||||
|
case "auth":
|
||||||
|
if (!csess.has(m.id)) return;
|
||||||
|
csess.get(m.id).pubkey = m.pubkey;
|
||||||
|
if (m.pubkey && private_keys[m.pubkey]) {
|
||||||
|
for (const relay of userRelays.get(m.id)) {
|
||||||
|
for (const challenge of relay.pendingNIP42) {
|
||||||
|
nip42(relay, m.pubkey, private_keys[m.pubkey], challenge);
|
||||||
|
relay.pendingNIP42.delete(challenge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
parentPort.postMessage({
|
||||||
|
type: "stats",
|
||||||
|
data: stats
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// WS - New session for client $id
|
||||||
|
// mostly for new idle session.
|
||||||
|
function newsess() {
|
||||||
|
const id = Date.now() + "_" + threadId + "_" + Math.random();
|
||||||
|
const shift = loadbalancer.shift();
|
||||||
|
loadbalancer.push(shift);
|
||||||
|
|
||||||
|
userRelays.set(id, new Set());
|
||||||
|
csess.set(id, null);
|
||||||
|
idleSess.add(id);
|
||||||
|
|
||||||
|
if (cache_relays) {
|
||||||
|
for (const url of cache_relays) {
|
||||||
|
newConn(url, id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (shift) {
|
||||||
|
case "_me":
|
||||||
|
for (const url of relays) {
|
||||||
|
newConn(url, id);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
newConn(shift, id);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WS - Broadcast message to every existing sockets
|
||||||
|
function bc(msg, id, toCacheOnly) {
|
||||||
|
if (toCacheOnly && !cache_relays?.length) return;
|
||||||
|
for (const relay of userRelays.get(id)) {
|
||||||
|
if (relay.readyState !== 1) continue;
|
||||||
|
if (toCacheOnly && !relay.isCache) continue;
|
||||||
|
|
||||||
|
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
||||||
|
if ((upstream_ratelimit_expiration) > (Date.now() - relay.ratelimit)) continue;
|
||||||
|
|
||||||
|
relay.send(JSON.stringify(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getIdleSess(ident, infos) {
|
||||||
|
const ws = {};
|
||||||
|
ws.subs = new Map(); // contains filter submitted by clients. per subID
|
||||||
|
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
|
||||||
|
ws.events = new Map(); // only to prevent the retransmit of the same event. per subID
|
||||||
|
ws.my_events = new Set(); // for event retransmitting.
|
||||||
|
ws.pendingEOSE = new Map(); // each contain subID
|
||||||
|
ws.reconnectTimeout = new Set(); // relays timeout() before reconnection. Only use after client disconnected.
|
||||||
|
ws.subalias = new Map();
|
||||||
|
ws.fakesubalias = new Map();
|
||||||
|
ws.mergedFilters = new Map();
|
||||||
|
|
||||||
|
// handled in bouncer.js
|
||||||
|
ws.ip = null;
|
||||||
|
ws.pubkey = null;
|
||||||
|
ws.rejectKinds = null;
|
||||||
|
ws.acceptKinds = null;
|
||||||
|
ws.forcedLimit = null;
|
||||||
|
ws.accurateMode = 0;
|
||||||
|
ws.saveMode = 0;
|
||||||
|
|
||||||
|
for (const i in infos) {
|
||||||
|
ws[i] = infos[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ws.pubkey && private_keys[ws.pubkey]) {
|
||||||
|
for (const relay of userRelays.get(ws.id)) {
|
||||||
|
for (const challenge of relay.pendingNIP42) {
|
||||||
|
nip42(relay, ws.pubkey, private_keys[ws.pubkey], challenge);
|
||||||
|
relay.pendingNIP42.delete(challenge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.id = idleSess.values().next().value;
|
||||||
|
idleSess.delete(ws.id);
|
||||||
|
csess.set(ws.id, ws);
|
||||||
|
|
||||||
|
parentPort.postMessage({
|
||||||
|
type: "sessreg",
|
||||||
|
ident,
|
||||||
|
id: ws.id
|
||||||
|
});
|
||||||
|
|
||||||
|
if (log_about_relays) console.log(threadId, "---", ws.ip, "is now using session", ws.id);
|
||||||
|
|
||||||
|
newsess();
|
||||||
|
}
|
||||||
|
|
||||||
|
function _matchFilters(filters, event) {
|
||||||
|
// nostr-tools being randomly throw error in their own code. Put safety.
|
||||||
|
try {
|
||||||
|
return matchFilters(filters, event);
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function relay_type(addr) {
|
||||||
|
switch (true) {
|
||||||
|
case relays.includes(addr):
|
||||||
|
return "relay";
|
||||||
|
break;
|
||||||
|
case cache_relays.includes(addr):
|
||||||
|
return "cache_relay";
|
||||||
|
break;
|
||||||
|
case loadbalancer.includes(addr):
|
||||||
|
return "loadbalancer";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WS - Sessions
|
||||||
|
function newConn(addr, id, reconn_t = 0) {
|
||||||
|
if (!csess.has(id)) return;
|
||||||
|
if (!stats[addr]) stats[addr] = { raw_rx: 0, rx: 0, tx: 0, f: 0 };
|
||||||
|
const relay = new WebSocket(addr, {
|
||||||
|
headers: {
|
||||||
|
"User-Agent": `Bostr ${version}; The nostr relay bouncer; https://github.com/Yonle/bostr`,
|
||||||
|
},
|
||||||
|
noDelay: true,
|
||||||
|
allowSynchronousEvents: true
|
||||||
|
});
|
||||||
|
|
||||||
|
relay.isCache = relay_type(addr) === "cache_relay";
|
||||||
|
relay.isLoadBalancer = relay_type(addr) === "loadbalancer";
|
||||||
|
relay.ratelimit = 0;
|
||||||
|
relay.pendingNIP42 = new Set();
|
||||||
|
relay.on('open', _ => {
|
||||||
|
if (!csess.has(id)) return relay.terminate();
|
||||||
|
const client = csess.get(id);
|
||||||
|
reconn_t = 0;
|
||||||
|
if (log_about_relays) console.log(threadId, "---", id, "Connected to", addr, `(${relay_type(addr)})`);
|
||||||
|
|
||||||
|
if (!client) return;
|
||||||
|
for (const i of client.my_events) {
|
||||||
|
relay.send(JSON.stringify(["EVENT", i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const i of client.subs) {
|
||||||
|
relay.send(JSON.stringify(["REQ", client.fakesubalias.get(i[0]), ...i[1]]));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
relay.on('message', data => {
|
||||||
|
if (!csess.has(id)) return relay.terminate();
|
||||||
|
try {
|
||||||
|
data = JSON.parse(data);
|
||||||
|
} catch (error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const client = csess.get(id);
|
||||||
|
if (!client) return;
|
||||||
|
|
||||||
|
switch (data[0]) {
|
||||||
|
case "EVENT": {
|
||||||
|
stats._global.raw_rx++;
|
||||||
|
stats[addr].raw_rx++;
|
||||||
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
|
if (!client.subalias.has(data[1])) return;
|
||||||
|
data[1] = client.subalias.get(data[1]);
|
||||||
|
|
||||||
|
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
||||||
|
if (!relay.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
|
||||||
|
const filter = client.mergedFilters.get(data[1]);
|
||||||
|
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !relay.isCache) return;
|
||||||
|
|
||||||
|
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
||||||
|
|
||||||
|
const filters = client.subs.get(data[1]);
|
||||||
|
if (!_matchFilters(filters, data[2])) return;
|
||||||
|
|
||||||
|
const NotInSearchQuery = "search" in filter && !data[2]?.content?.toLowerCase().includes(filter.search.toLowerCase());
|
||||||
|
if (NotInSearchQuery) return;
|
||||||
|
|
||||||
|
if (!relay.isLoadBalancer) client.events.get(data[1]).add(data[2]?.id);
|
||||||
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
||||||
|
|
||||||
|
stats._global.rx++;
|
||||||
|
stats[addr].rx++;
|
||||||
|
|
||||||
|
// Now count for REQ limit requested by client.
|
||||||
|
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
||||||
|
|
||||||
|
// Skip if EOSE has been omitted
|
||||||
|
if (!client.pendingEOSE.has(data[1]) || client.pause_subs.has(data[1]) || relay.isLoadBalancer) return;
|
||||||
|
const limit = getFilterLimit(filter);
|
||||||
|
if (limit === Infinity) return;
|
||||||
|
if (client.events.get(data[1]).size >= limit) {
|
||||||
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(["EOSE", data[1]]) });
|
||||||
|
|
||||||
|
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
||||||
|
client.pause_subs.add(data[1]);
|
||||||
|
} else {
|
||||||
|
client.pendingEOSE.delete(data[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "EOSE":
|
||||||
|
if (!client.subalias.has(data[1])) return;
|
||||||
|
data[1] = client.subalias.get(data[1]);
|
||||||
|
if (!client.pendingEOSE.has(data[1]) && !relay.isLoadBalancer) return;
|
||||||
|
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
|
|
||||||
|
if (log_about_relays) console.log(threadId, "---", id, `got EOSE from ${addr} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
|
||||||
|
|
||||||
|
if (!relay.isCache && (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size)))) return;
|
||||||
|
if (relay.isCache && !client.events.get(data[1]).size) return; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
|
||||||
|
client.pendingEOSE.delete(data[1]);
|
||||||
|
|
||||||
|
if (client.pause_subs.has(data[1]) && !relay.isLoadBalancer) {
|
||||||
|
client.pause_subs.delete(data[1]);
|
||||||
|
} else {
|
||||||
|
parentPort.postMessage({ type: "upstream_msg", id, data: JSON.stringify(data) });
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case "AUTH":
|
||||||
|
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return relay.pendingNIP42.add(data[1]);
|
||||||
|
nip42(relay, client.pubkey, private_keys[client.pubkey], data[1]);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "NOTICE":
|
||||||
|
if (typeof(data[1]) !== "string") return;
|
||||||
|
if (data[1].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
||||||
|
|
||||||
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1]);
|
||||||
|
|
||||||
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "CLOSED":
|
||||||
|
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "string")) return;
|
||||||
|
if (data[2].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
||||||
|
|
||||||
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2]);
|
||||||
|
|
||||||
|
if (data[2].length) {
|
||||||
|
stats._global.f++;
|
||||||
|
stats[addr].f++;
|
||||||
|
}
|
||||||
|
if (client.pendingEOSE.has(data[1])) client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case "OK":
|
||||||
|
if ((typeof(data[1]) !== "string") || (typeof(data[2]) !== "boolean") || (typeof(data[3]) !== "string")) return;
|
||||||
|
if (data[3].startsWith("rate-limited")) relay.ratelimit = Date.now();
|
||||||
|
|
||||||
|
if (log_about_relays) console.log(threadId, id, addr, data[0], data[1], data[2], data[3]);
|
||||||
|
|
||||||
|
switch (data[2]) {
|
||||||
|
case true:
|
||||||
|
stats._global.tx++;
|
||||||
|
stats[addr].tx++;
|
||||||
|
case false:
|
||||||
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
relay.on('error', _ => {
|
||||||
|
if (log_about_relays) console.error(threadId, "-!-", id, addr, _.toString())
|
||||||
|
});
|
||||||
|
|
||||||
|
relay.on('close', _ => {
|
||||||
|
if (!userRelays.has(id)) return;
|
||||||
|
userRelays.get(id).delete(relay); // Remove this socket session from <client.relays> list
|
||||||
|
if (log_about_relays) console.log(threadId, "-!-", id, "Disconnected from", addr, `(${relay_type(addr)})`);
|
||||||
|
reconn_t += reconnect_time || 5000
|
||||||
|
setTimeout(_ => {
|
||||||
|
newConn(addr, id, reconn_t);
|
||||||
|
}, reconn_t);
|
||||||
|
|
||||||
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
});
|
||||||
|
|
||||||
|
relay.on('unexpected-response', (req, res) => {
|
||||||
|
if (!userRelays.has(id)) return;
|
||||||
|
userRelays.get(id).delete(relay);
|
||||||
|
if (res.statusCode >= 500) return relay.emit("close", null);
|
||||||
|
relays = relays.filter(_ => _ != addr);
|
||||||
|
console.log(threadId, "-!-", `${addr} give status code ${res.statusCode}. Not (re)connect with new session again.`);
|
||||||
|
|
||||||
|
stats._global.f++
|
||||||
|
stats[addr].f++
|
||||||
|
});
|
||||||
|
|
||||||
|
userRelays.get(id).add(relay); // Add this socket session to <client.relays>
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let i = 1; i <= (idle_sessions || 1); i++) {
|
||||||
|
newsess();
|
||||||
|
}
|
Reference in New Issue
Block a user