diff --git a/bouncer/bouncer1.js b/bouncer/bouncer1.js index 4038bff..8e8da64 100644 --- a/bouncer/bouncer1.js +++ b/bouncer/bouncer1.js @@ -5,7 +5,8 @@ const socks = new Set(); const sess = new SQLite((process.env.IN_MEMORY || tmp_store != "disk") ? null : (__dirname + "/../.temporary.db")); const csess = new Map(); -const pendingEOSE = new Map(); +const pendingEOSE = new Map(); // per sessID +const reqLimit = new Map(); // per sessID // Handle database.... sess.unsafeMode(true); @@ -42,12 +43,14 @@ module.exports = (ws, req) => { bc(data); sess.prepare("INSERT INTO sess VALUES (?, ?, ?);").run(ws.id, data[1], JSON.stringify(data[2])); pendingEOSE.set(data[1], 0); + reqLimit.set(data[1], data[2]?.limit); break; case "CLOSE": if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); data[1] = ws.id + ":" + data[1]; bc(data); pendingEOSE.delete(data[1]); + reqLimit.delete(data[1]); sess.prepare("DELETE FROM sess WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); sess.prepare("DELETE FROM events WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); break; @@ -65,6 +68,7 @@ module.exports = (ws, req) => { for (i of sess.prepare("SELECT subID FROM sess WHERE cID = ?").iterate(ws.id)) { bc(["CLOSE", i.subID]); pendingEOSE.delete(i.subID); + reqLimit.delete(i.subID); } sess.prepare("DELETE FROM sess WHERE cID = ?;").run(ws.id); @@ -93,7 +97,6 @@ function newConn(addr) { for (i of sess.prepare("SELECT subID, filter FROM sess").iterate()) { if (relay.readyState >= 2) break; relay.send(JSON.stringify(["REQ", i.subID, JSON.parse(i.filter)])); - if (!pendingEOSE.has(i.subID)) pendingEOSE.set(i.subID, 0); } }); @@ -121,6 +124,23 @@ function newConn(addr) { sess.prepare("INSERT INTO events VALUES (?, ?, ?);").run(cID, subID, data[2]?.id); data[1] = sID; csess.get(cID)?.send(JSON.stringify(data)); + + // Now count for REQ limit requested by client. + // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID + + // Skip if EOSE has been omitted + if (!pendingEOSE.has(subID)) return; + + const remainingEvents = reqLimit.get(subID); + + if (remainingEvents) reqLimit.set(subID, remainingEvents-1); + if (!remainingEvents) { + // Once there are no remaining event, Do the instructed above. + csess.get(cID)?.send(JSON.stringify(["EOSE", sID])); + pendingEOSE.delete(subID); + reqLimit.delete(subID); + } + break; } case "EOSE": { @@ -138,6 +158,7 @@ function newConn(addr) { csess.get(cID)?.send(JSON.stringify(["EOSE", sID])); pendingEOSE.delete(subID); + reqLimit.delete(subID); break; } } diff --git a/bouncer/bouncer2.js b/bouncer/bouncer2.js index 8028c00..042e712 100644 --- a/bouncer/bouncer2.js +++ b/bouncer/bouncer2.js @@ -5,7 +5,8 @@ const socks = new Set(); const sess = new SQLite((process.env.IN_MEMORY || tmp_store != "disk") ? null : (__dirname + "/../.temporary.db")); const csess = new Map(); -const pendingEOSE = new Map(); +const pendingEOSE = new Map(); // per sessID +const reqLimit = new Map(); // per sessID // Handle database.... sess.unsafeMode(true); @@ -41,11 +42,13 @@ module.exports = (ws, req) => { bc(data, ws.id); sess.prepare("INSERT INTO sess VALUES (?, ?, ?);").run(ws.id, data[1], JSON.stringify(data[2])); pendingEOSE.set(ws.id + ":" + data[1], 0); + reqLimit.set(ws.id + ":" + data[1], data[2]?.limit); break; case "CLOSE": if (typeof(data[1]) !== "string") return ws.send(JSON.stringify(["NOTICE", "error: bad request."])); bc(data, ws.id); pendingEOSE.delete(ws.id + ":" + data[1]); + reqLimit.delete(ws.id + ":" + data[1]); sess.prepare("DELETE FROM sess WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); sess.prepare("DELETE FROM events WHERE cID = ? AND subID = ?;").run(ws.id, data[1]); break; @@ -91,6 +94,11 @@ function terminate_sess(id) { if (!sub[0].startsWith(id)) continue; pendingEOSE.delete(id); } + + for (sub of reqLimit) { + if (!sub[0].startsWith(id)) continue; + reqLimit.delete(id); + } } // WS - Sessions @@ -123,6 +131,23 @@ function newConn(addr, id) { sess.prepare("INSERT INTO events VALUES (?, ?, ?);").run(id, data[1], data[2]?.id); csess.get(id)?.send(JSON.stringify(data)); + + // Now count for REQ limit requested by client. + // If it's at the limit, Send EOSE to client and delete pendingEOSE of subID + + // Skip if EOSE has been omitted + const subID = id + ":" + data[1]; + if (!pendingEOSE.has(subID)) return; + + const remainingEvents = reqLimit.get(subID); + + if (remainingEvents) reqLimit.set(subID, remainingEvents-1); + if (!remainingEvents) { + // Once there are no remaining event, Do the instructed above. + csess.get(id)?.send(JSON.stringify(["EOSE", data[1]])); + pendingEOSE.delete(subID); + reqLimit.delete(subID); + } break; } case "EOSE": @@ -130,6 +155,8 @@ function newConn(addr, id) { pendingEOSE.set(id + ":" + data[1], pendingEOSE.get(id + ":" + data[1]) + 1); if (pendingEOSE.get(id + ":" + data[1]) < Array.from(relays).filter(_ => _.id === id).length) return; csess.get(id)?.send(JSON.stringify(data)); + pendingEOSE.delete(id + ":" + data[1]); + reqLimit.delete(id + ":" + data[1]); break; } });