mirror of
https://github.com/Yonle/bostr.git
synced 2025-09-27 21:36:21 +02:00
feat: added [cache_relays] feature
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
79
bouncer.js
79
bouncer.js
@@ -3,16 +3,20 @@ const { validateEvent, nip19 } = require("nostr-tools");
|
|||||||
const auth = require("./auth.js");
|
const auth = require("./auth.js");
|
||||||
const nip42 = require("./nip42.js");
|
const nip42 = require("./nip42.js");
|
||||||
|
|
||||||
let { relays, tmp_store, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score } = require("./config");
|
let { relays, tmp_store, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, eose_timeout, max_eose_score, cache_relays } = require("./config");
|
||||||
|
|
||||||
const socks = new Set();
|
const socks = new Set();
|
||||||
const csess = new Map();
|
const csess = new Map();
|
||||||
|
|
||||||
|
let recentevent = {};
|
||||||
|
|
||||||
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
||||||
|
|
||||||
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
|
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
|
||||||
if (relays.length < max_eose_score) max_eose_score = 0;
|
if (relays.length < max_eose_score) max_eose_score = 0;
|
||||||
|
|
||||||
|
cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/");
|
||||||
|
|
||||||
// CL - User socket
|
// CL - User socket
|
||||||
module.exports = (ws, req) => {
|
module.exports = (ws, req) => {
|
||||||
let authKey = null;
|
let authKey = null;
|
||||||
@@ -80,14 +84,17 @@ module.exports = (ws, req) => {
|
|||||||
ws.pendingEOSE.delete(data[1]);
|
ws.pendingEOSE.delete(data[1]);
|
||||||
ws.pause_subs.delete(data[1]);
|
ws.pause_subs.delete(data[1]);
|
||||||
cancel_EOSETimeout(ws.id, data[1]);
|
cancel_EOSETimeout(ws.id, data[1]);
|
||||||
bc(data, ws.id);
|
|
||||||
|
delete recentevent[ws.id + ":" + data[1]];
|
||||||
|
cache_bc(data, ws.id);
|
||||||
|
direct_bc(data, ws.id);
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
if (auth(authKey, data[1], ws, req)) {
|
if (auth(authKey, data[1], ws, req)) {
|
||||||
ws.pubkey = data[1].pubkey;
|
ws.pubkey = data[1].pubkey;
|
||||||
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
console.log(process.pid, "---", ws.id, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
||||||
if (authorized) return;
|
if (authorized) return;
|
||||||
relays.forEach(_ => newConn(_, ws.id));
|
newsess(ws.id);
|
||||||
authorized = true;
|
authorized = true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@@ -111,7 +118,7 @@ module.exports = (ws, req) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
csess.set(ws.id, ws);
|
csess.set(ws.id, ws);
|
||||||
if (authorized) relays.forEach(_ => newConn(_, ws.id));
|
if (authorized) newsess(ws.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// CL - Set up EOSE timeout
|
// CL - Set up EOSE timeout
|
||||||
@@ -142,15 +149,36 @@ function cancel_EOSETimeout(id, subid) {
|
|||||||
c.EOSETimeout.delete(subid);
|
c.EOSETimeout.delete(subid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WS - New session for client $id
|
||||||
|
function newsess(id) {
|
||||||
|
relays.forEach(_ => newConn(_, id));
|
||||||
|
cache_relays?.forEach(_ => newConn(_, id));
|
||||||
|
}
|
||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
function bc(msg, id) {
|
function direct_bc(msg, id) {
|
||||||
for (sock of socks) {
|
for (sock of socks) {
|
||||||
|
if (cache_relays?.includes(sock.url)) continue;
|
||||||
if (sock.id !== id) continue;
|
if (sock.id !== id) continue;
|
||||||
if (sock.readyState >= 2) return socks.delete(sock);
|
if (sock.readyState >= 2) return socks.delete(sock);
|
||||||
sock.send(JSON.stringify(msg));
|
sock.send(JSON.stringify(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function cache_bc(msg, id) {
|
||||||
|
for (sock of socks) {
|
||||||
|
if (!cache_relays?.includes(sock.url)) continue;
|
||||||
|
if (sock.id !== id) continue;
|
||||||
|
if (sock.readyState >= 2) return socks.delete(sock);
|
||||||
|
sock.send(JSON.stringify(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function bc(msg, id) {
|
||||||
|
if (!cache_relays?.length) direct_bc(msg, id);
|
||||||
|
else cache_bc(msg, id);
|
||||||
|
}
|
||||||
|
|
||||||
// WS - Terminate all existing sockets that were for <id>
|
// WS - Terminate all existing sockets that were for <id>
|
||||||
function terminate_subs(id) {
|
function terminate_subs(id) {
|
||||||
for (sock of socks) {
|
for (sock of socks) {
|
||||||
@@ -175,12 +203,14 @@ function newConn(addr, id) {
|
|||||||
socks.add(relay); // Add this socket session to [socks]
|
socks.add(relay); // Add this socket session to [socks]
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected");
|
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}] [${socks.size}/${relays.length*csess.size}]`, relay.url, "is connected");
|
||||||
|
|
||||||
for (i of client.my_events) {
|
if (cache_relays?.includes(relay.url)) {
|
||||||
relay.send(JSON.stringify(["EVENT", i]));
|
for (i of client.my_events) {
|
||||||
}
|
relay.send(JSON.stringify(["EVENT", i]));
|
||||||
|
}
|
||||||
|
|
||||||
for (i of client.subs) {
|
for (i of client.subs) {
|
||||||
relay.send(JSON.stringify(["REQ", i[0], i[1]]));
|
relay.send(JSON.stringify(["REQ", i[0], i[1]]));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -196,25 +226,30 @@ function newConn(addr, id) {
|
|||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
if (!client.subs.has(data[1])) return;
|
if (!client.subs.has(data[1])) return;
|
||||||
timeoutEOSE(id, data[1]);
|
timeoutEOSE(id, data[1]);
|
||||||
if (client.pause_subs.has(data[1])) return;
|
if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
|
||||||
|
|
||||||
// if filter.since > receivedEvent.created_at, skip
|
// if filter.since > receivedEvent.created_at, skip
|
||||||
// if receivedEvent.created_at > filter.until, skip
|
// if receivedEvent.created_at > filter.until, skip
|
||||||
if (client.subs.get(data[1]).since > data[2].created_at) return;
|
if (client.subs.get(data[1]).since > data[2].created_at) return;
|
||||||
if (data[2].created_at > client.subs.get(data[1]).until) return;
|
if (data[2].created_at > client.subs.get(data[1]).until) return;
|
||||||
const NotInSearchQuery = client.subs.get(data[1]).search && !data[2].content?.toLowerCase().includes(client.subs.get(data[1]).search?.toLowerCase());
|
const NotInSearchQuery = client.subs.get(data[1]).search && !data[2].content?.toLowerCase().includess(client.subs.get(data[1]).search?.toLowerCase());
|
||||||
|
|
||||||
if (NotInSearchQuery) return;
|
if (NotInSearchQuery) return;
|
||||||
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
if (client.events.get(data[1]).has(data[2]?.id)) return; // No need to transmit once it has been transmitted before.
|
||||||
|
|
||||||
client.events.get(data[1]).add(data[2]?.id);
|
client.events.get(data[1]).add(data[2]?.id);
|
||||||
client.send(JSON.stringify(data));
|
if (!client.pause_subs.has(data[1])) client.send(JSON.stringify(data));
|
||||||
|
|
||||||
|
// send into cache relays.
|
||||||
|
if (!cache_relays?.includes(relay.url)) cache_bc(["EVENT", data[2]], id);
|
||||||
|
if (data[2]?.created_at > recentevent[id + ":" + data[1]])
|
||||||
|
recentevent[id + ":" + data[1]] = data[2]?.created_at
|
||||||
|
|
||||||
// Now count for REQ limit requested by client.
|
// Now count for REQ limit requested by client.
|
||||||
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
||||||
|
|
||||||
// Skip if EOSE has been omitted
|
// Skip if EOSE has been omitted
|
||||||
if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?.limit) return;
|
if (!client.pendingEOSE.has(data[1]) || !client.subs.get(data[1])?.limit || client.pause_subs.has(data[1])) return;
|
||||||
if (client.events.get(data[1]).size >= client.subs.get(data[1])?.limit) {
|
if (client.events.get(data[1]).size >= client.subs.get(data[1])?.limit) {
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
client.send(JSON.stringify(["EOSE", data[1]]));
|
client.send(JSON.stringify(["EOSE", data[1]]));
|
||||||
@@ -230,9 +265,21 @@ function newConn(addr, id) {
|
|||||||
if (!client.pendingEOSE.has(data[1])) return;
|
if (!client.pendingEOSE.has(data[1])) return;
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
if (process.env.LOG_ABOUT_RELAYS || log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
||||||
if (wait_eose && (client.pendingEOSE.get(data[1]) < (max_eose_score || Array.from(socks).filter(sock => sock.id === id).length))) return;
|
|
||||||
|
if (!cache_relays?.includes(relay.url)) {
|
||||||
|
if (wait_eose && (client.pendingEOSE.get(data[1]) < (max_eose_score || Array.from(socks).filter(sock => sock.id === id).length))) return;
|
||||||
|
cancel_EOSETimeout(data[1]);
|
||||||
|
} else {
|
||||||
|
if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays.includes(sock.url)).length) return;
|
||||||
|
// get the filter
|
||||||
|
const filter = client.subs.get(data[1]);
|
||||||
|
if (!filter.since && recentevent[id + ":" + data[1]]) filter.since = recentevent[id + ":" + data[1]];
|
||||||
|
|
||||||
|
// now req to the direct connection, with the recent one please.
|
||||||
|
return direct_bc(["REQ", data[1], filter], id);
|
||||||
|
}
|
||||||
|
|
||||||
client.pendingEOSE.delete(data[1]);
|
client.pendingEOSE.delete(data[1]);
|
||||||
cancel_EOSETimeout(data[1]);
|
|
||||||
if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
|
if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
|
||||||
client.send(JSON.stringify(data));
|
client.send(JSON.stringify(data));
|
||||||
break;
|
break;
|
||||||
|
@@ -80,6 +80,17 @@ module.exports = {
|
|||||||
"version": require("./package.json").version
|
"version": require("./package.json").version
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// Cache relays
|
||||||
|
// Used for caching received events from <relays> to reduce bandwidth waste.
|
||||||
|
// Keeping this empty will disable caching function.
|
||||||
|
//
|
||||||
|
// To make this working properly, Please enable <pause_on_limit>.
|
||||||
|
cache_relays: [
|
||||||
|
// "ws://localhost:8001",
|
||||||
|
// "ws://localhost:8002",
|
||||||
|
// ...and so on
|
||||||
|
],
|
||||||
|
|
||||||
// Nostr relays to bounce [Required]
|
// Nostr relays to bounce [Required]
|
||||||
relays: [
|
relays: [
|
||||||
"wss://example1.com",
|
"wss://example1.com",
|
||||||
|
Reference in New Issue
Block a user