fix: subscribe directly to loader observable to process zaps as they arrive

Complete rethink of the zap sync strategy. The issue was that we were
subscribing to eventStore.timeline() and eventStore.insert$ which were
not working as expected.

The TimelineLoader returns Observable<NostrEvent> that emits individual
events as they arrive from relays. We should subscribe directly to this.

Changes:
- Remove eventStore.timeline() and eventStore.insert$ subscriptions
- Subscribe directly to loader().subscribe() with next/error/complete handlers
- Process each event in the next() handler as it arrives from relays
- Log each zap received from loader for debugging
- Remove unused eventStore import

This is the simplest and most direct approach - process events exactly
as the loader emits them from the relays.
This commit is contained in:
Claude
2026-01-19 14:58:28 +00:00
parent 227edce28d
commit 6f525d1971

View File

@@ -8,7 +8,6 @@
import { Subscription } from "rxjs";
import { firstValueFrom, timeout as rxTimeout, of } from "rxjs";
import { catchError } from "rxjs/operators";
import eventStore from "./event-store";
import pool from "./relay-pool";
import relayListCache from "./relay-list-cache";
import { createTimelineLoader, addressLoader } from "./loaders";
@@ -109,56 +108,26 @@ class SupportersService {
},
]);
// Subscribe to insert stream from eventStore for real-time new events
const eventSubscription = eventStore.insert$.subscribe(
(event: NostrEvent) => {
// Filter for zaps to Grimoire
if (
event.kind === 9735 &&
event.tags.some(
(t: string[]) => t[0] === "p" && t[1] === GRIMOIRE_DONATE_PUBKEY,
)
) {
console.log(
`[Supporters] Received NEW zap event ${event.id.slice(0, 8)} from eventStore.insert$`,
);
this.processZapReceipt(event);
}
},
);
// Also subscribe to timeline to process ALL existing events
const timeline = eventStore.timeline([
{
kinds: [9735],
"#p": [GRIMOIRE_DONATE_PUBKEY],
},
]);
const timelineSubscription = timeline.subscribe(async (events) => {
console.log(
`[Supporters] Timeline has ${events.length} total zap events`,
);
// Process all existing events (includes both old and new)
await Promise.all(events.map((event) => this.processZapReceipt(event)));
});
// Start the loader (pushes events to store)
// Subscribe directly to the loader's observable
// TimelineLoader returns Observable<NostrEvent> - emits individual events from relays
const loaderSubscription = loader().subscribe({
next: (event: NostrEvent) => {
console.log(
`[Supporters] Received zap from loader: ${event.id.slice(0, 8)}`,
);
// Process each event as it arrives from relays
this.processZapReceipt(event);
},
error: (error) => {
console.error("[Supporters] Timeline loader error:", error);
},
complete: () => {
console.log("[Supporters] Timeline loader completed");
console.log("[Supporters] Timeline loader completed (EOSE)");
},
});
// Combine all subscriptions for cleanup
this.subscription = eventSubscription;
if (this.subscription) {
this.subscription.add(timelineSubscription);
this.subscription.add(loaderSubscription);
}
// Store subscription for cleanup
this.subscription = loaderSubscription;
} catch (error) {
console.error("[Supporters] Failed to subscribe to zap receipts:", error);
}