diff --git a/src/lib/chat/adapters/nip-29-adapter.ts b/src/lib/chat/adapters/nip-29-adapter.ts index f41047a..69ddf40 100644 --- a/src/lib/chat/adapters/nip-29-adapter.ts +++ b/src/lib/chat/adapters/nip-29-adapter.ts @@ -1,5 +1,5 @@ -import { Observable } from "rxjs"; -import { map, first } from "rxjs/operators"; +import { Observable, Subject } from "rxjs"; +import { map, first, skipUntil } from "rxjs/operators"; import type { Filter } from "nostr-tools"; import { ChatProtocolAdapter } from "./base-adapter"; import type { @@ -292,6 +292,9 @@ export class Nip29Adapter extends ChatProtocolAdapter { filter.since = options.after; } + // Create a subject to track EOSE + const eoseSubject = new Subject(); + // Start a persistent subscription to the group relay // This will feed new messages into the EventStore in real-time pool @@ -303,6 +306,8 @@ export class Nip29Adapter extends ChatProtocolAdapter { if (typeof response === "string") { // EOSE received console.log("[NIP-29] EOSE received for messages"); + eoseSubject.next(); + eoseSubject.complete(); } else { // Event received console.log( @@ -313,7 +318,9 @@ export class Nip29Adapter extends ChatProtocolAdapter { }); // Return observable from EventStore which will update automatically + // Wait for EOSE before emitting to prevent scroll jumping during initial load return eventStore.timeline(filter).pipe( + skipUntil(eoseSubject), map((events) => { console.log(`[NIP-29] Timeline has ${events.length} messages`); return events diff --git a/src/lib/chat/adapters/nip-c7-adapter.ts b/src/lib/chat/adapters/nip-c7-adapter.ts index d5a9306..f241b7e 100644 --- a/src/lib/chat/adapters/nip-c7-adapter.ts +++ b/src/lib/chat/adapters/nip-c7-adapter.ts @@ -1,5 +1,5 @@ -import { Observable, firstValueFrom } from "rxjs"; -import { map, first } from "rxjs/operators"; +import { Observable, firstValueFrom, Subject } from "rxjs"; +import { map, first, skipUntil } from "rxjs/operators"; import { nip19 } from "nostr-tools"; import type { Filter } from "nostr-tools"; import { ChatProtocolAdapter } from "./base-adapter"; @@ -156,15 +156,35 @@ export class NipC7Adapter extends ChatProtocolAdapter { filter.since = options.after; } - return eventStore - .timeline(filter) - .pipe( - map((events) => - events - .map((event) => this.eventToMessage(event, conversation.id)) - .sort((a, b) => a.timestamp - b.timestamp), - ), - ); + // Create a subject to track EOSE + const eoseSubject = new Subject(); + + // Start subscription to populate EventStore and track EOSE + pool + .subscription([], [filter], { + eventStore, // Automatically add to store + }) + .subscribe({ + next: (response) => { + if (typeof response === "string") { + // EOSE received + console.log("[NIP-C7] EOSE received for messages"); + eoseSubject.next(); + eoseSubject.complete(); + } + }, + }); + + // Return observable from EventStore + // Wait for EOSE before emitting to prevent scroll jumping during initial load + return eventStore.timeline(filter).pipe( + skipUntil(eoseSubject), + map((events) => + events + .map((event) => this.eventToMessage(event, conversation.id)) + .sort((a, b) => a.timestamp - b.timestamp), + ), + ); } /**