mirror of
https://github.com/Yonle/bostr.git
synced 2025-09-28 13:56:20 +02:00
bouncer: remove cache as it still consumes bandwidth anyway
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
57
bouncer.js
57
bouncer.js
@@ -5,7 +5,7 @@ const { verifySignature, validateEvent, nip19 } = require("nostr-tools");
|
|||||||
const auth = require("./auth.js");
|
const auth = require("./auth.js");
|
||||||
const nip42 = require("./nip42.js");
|
const nip42 = require("./nip42.js");
|
||||||
|
|
||||||
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, cache_relays, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
|
let { relays, approved_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs } = require("./config");
|
||||||
|
|
||||||
const csess = new Map();
|
const csess = new Map();
|
||||||
|
|
||||||
@@ -16,8 +16,6 @@ approved_publishers = approved_publishers?.map(i => i.startsWith("npub") ? nip19
|
|||||||
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
|
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
|
||||||
if (relays.length < max_eose_score) max_eose_score = 0;
|
if (relays.length < max_eose_score) max_eose_score = 0;
|
||||||
|
|
||||||
cache_relays = cache_relays?.map(i => i.endsWith("/") ? i : i + "/");
|
|
||||||
|
|
||||||
// CL - User socket
|
// CL - User socket
|
||||||
module.exports = (ws, req, onClose) => {
|
module.exports = (ws, req, onClose) => {
|
||||||
let authKey = null;
|
let authKey = null;
|
||||||
@@ -81,8 +79,7 @@ module.exports = (ws, req, onClose) => {
|
|||||||
lastEvent = Date.now();
|
lastEvent = Date.now();
|
||||||
|
|
||||||
ws.my_events.add(data[1]);
|
ws.my_events.add(data[1]);
|
||||||
direct_bc(data, ws.id);
|
bc(data, ws.id);
|
||||||
cache_bc(data, ws.id);
|
|
||||||
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
|
ws.send(JSON.stringify(["OK", data[1]?.id, true, ""]));
|
||||||
break;
|
break;
|
||||||
case "REQ": {
|
case "REQ": {
|
||||||
@@ -120,8 +117,7 @@ module.exports = (ws, req, onClose) => {
|
|||||||
ws.subalias.delete(faked);
|
ws.subalias.delete(faked);
|
||||||
|
|
||||||
data[1] = faked;
|
data[1] = faked;
|
||||||
cache_bc(data, ws.id);
|
bc(data, ws.id);
|
||||||
direct_bc(data, ws.id);
|
|
||||||
ws.send(JSON.stringify(["CLOSED", origID, ""]));
|
ws.send(JSON.stringify(["CLOSED", origID, ""]));
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
@@ -163,14 +159,12 @@ module.exports = (ws, req, onClose) => {
|
|||||||
|
|
||||||
// WS - New session for client $id
|
// WS - New session for client $id
|
||||||
function newsess(id) {
|
function newsess(id) {
|
||||||
cache_relays?.forEach(_ => newConn(_, id));
|
|
||||||
relays.forEach(_ => newConn(_, id));
|
relays.forEach(_ => newConn(_, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
function direct_bc(msg, id) {
|
function bc(msg, id) {
|
||||||
for (const sock of csess.get(id).relays) {
|
for (const sock of csess.get(id).relays) {
|
||||||
if (cache_relays?.includes(sock.url)) continue;
|
|
||||||
if (sock.readyState !== 1) continue;
|
if (sock.readyState !== 1) continue;
|
||||||
|
|
||||||
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
||||||
@@ -179,19 +173,6 @@ function direct_bc(msg, id) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function cache_bc(msg, id) {
|
|
||||||
for (const sock of csess.get(id).relays) {
|
|
||||||
if (!cache_relays?.includes(sock.url)) continue;
|
|
||||||
if (sock.readyState !== 1) continue;
|
|
||||||
sock.send(JSON.stringify(msg));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function bc(msg, id) {
|
|
||||||
if (!cache_relays?.length) direct_bc(msg, id);
|
|
||||||
else cache_bc(msg, id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// WS - Terminate all existing sockets that were for <id>
|
// WS - Terminate all existing sockets that were for <id>
|
||||||
function terminate_sess(id) {
|
function terminate_sess(id) {
|
||||||
for (const sock of csess.get(id).relays) {
|
for (const sock of csess.get(id).relays) {
|
||||||
@@ -244,7 +225,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
if (!client.subalias.has(data[1])) return;
|
if (!client.subalias.has(data[1])) return;
|
||||||
data[1] = client.subalias.get(data[1]);
|
data[1] = client.subalias.get(data[1]);
|
||||||
if (client.pause_subs.has(data[1]) && !cache_relays?.includes(relay.url)) return;
|
if (client.pause_subs.has(data[1])) return;
|
||||||
|
|
||||||
// if filter.since > receivedEvent.created_at, skip
|
// if filter.since > receivedEvent.created_at, skip
|
||||||
// if receivedEvent.created_at > filter.until, skip
|
// if receivedEvent.created_at > filter.until, skip
|
||||||
@@ -264,9 +245,6 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
client.send(JSON.stringify(data));
|
client.send(JSON.stringify(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
// send into cache relays.
|
|
||||||
if (!cache_relays?.includes(relay.url)) cache_bc(["EVENT", data[2]], id);
|
|
||||||
|
|
||||||
// Now count for REQ limit requested by client.
|
// Now count for REQ limit requested by client.
|
||||||
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
|
||||||
|
|
||||||
@@ -275,7 +253,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
if (client.events.get(data[1]).size >= (cFilter?.ids?.length || cFilter?.limit)) {
|
if (client.events.get(data[1]).size >= (cFilter?.ids?.length || cFilter?.limit)) {
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
client.send(JSON.stringify(["EOSE", data[1]]));
|
client.send(JSON.stringify(["EOSE", data[1]]));
|
||||||
if (pause_on_limit || cache_relays?.includes(relay.url)) {
|
if (pause_on_limit) {
|
||||||
client.pause_subs.add(data[1]);
|
client.pause_subs.add(data[1]);
|
||||||
} else {
|
} else {
|
||||||
client.pendingEOSE.delete(data[1]);
|
client.pendingEOSE.delete(data[1]);
|
||||||
@@ -288,25 +266,16 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
data[1] = client.subalias.get(data[1]);
|
data[1] = client.subalias.get(data[1]);
|
||||||
if (!client.pendingEOSE.has(data[1])) return;
|
if (!client.pendingEOSE.has(data[1])) return;
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${Array.from(socks).filter(sock => sock.id === id).length} connected relays.`);
|
if (log_about_relays) console.log(process.pid, "---", `[${id}]`, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${client.relays.size} connected relays.`);
|
||||||
|
|
||||||
if (!cache_relays?.includes(relay.url)) {
|
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < client.relays.size))) return;
|
||||||
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => sock.id === id).length))) return;
|
client.pendingEOSE.delete(data[1]);
|
||||||
if (client.pause_subs.has(data[1])) return client.pause_subs.delete(data[1]);
|
|
||||||
|
if (client.pause_subs.has(data[1])) {
|
||||||
|
client.pause_subs.delete(data[1]);
|
||||||
} else {
|
} else {
|
||||||
if (client.pendingEOSE.get(data[1]) < Array.from(socks).filter(sock => (sock.id === id) && cache_relays?.includes(sock.url)).length) return;
|
client.send(JSON.stringify(data));
|
||||||
// get the filter
|
|
||||||
const filter = client.subs.get(data[1]);
|
|
||||||
if (client.pause_subs.has(data[1])) {
|
|
||||||
client.pause_subs.delete(data[1]);
|
|
||||||
client.pendingEOSE.delete(data[1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// now req to the direct connection, with the recent one please.
|
|
||||||
return direct_bc(["REQ", data[1], ...filter], id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
client.send(JSON.stringify(data));
|
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
|
if (!private_keys || typeof(data[1]) !== "string" || !client.pubkey) return;
|
||||||
|
@@ -123,17 +123,6 @@ module.exports = {
|
|||||||
// Path to favicon file.
|
// Path to favicon file.
|
||||||
favicon: "",
|
favicon: "",
|
||||||
|
|
||||||
// Cache relays
|
|
||||||
// Used for caching received events from <relays> to reduce bandwidth waste.
|
|
||||||
// Keeping this empty will disable caching function.
|
|
||||||
//
|
|
||||||
// To make this working properly, Please enable <pause_on_limit>.
|
|
||||||
cache_relays: [
|
|
||||||
// "ws://localhost:8001",
|
|
||||||
// "ws://localhost:8002",
|
|
||||||
// ...and so on
|
|
||||||
],
|
|
||||||
|
|
||||||
// Nostr relays to bounce [Required]
|
// Nostr relays to bounce [Required]
|
||||||
relays: [
|
relays: [
|
||||||
"wss://example1.com",
|
"wss://example1.com",
|
||||||
|
Reference in New Issue
Block a user