package nip77

import (
	"context"
	"fmt"
	"sync"

	"github.com/cespare/xxhash"
	"github.com/greatroar/blobloom"
	"github.com/nbd-wtf/go-nostr"
	"github.com/nbd-wtf/go-nostr/nip77/negentropy"
	"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector"
)

func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error {
	id := "go-nostr-tmp" // for now we can't have more than one subscription in the same connection

	data, err := store.QuerySync(ctx, filter)
	if err != nil {
		return fmt.Errorf("failed to query our local store: %w", err)
	}

	vec := vector.New()
	neg := negentropy.New(vec, 1024*1024)
	for _, evt := range data {
		vec.Insert(evt.CreatedAt, evt.ID)
	}
	vec.Seal()

	result := make(chan error)

	var r *nostr.Relay
	r, err = nostr.RelayConnect(ctx, url, nostr.WithCustomHandler(func(data []byte) {
		envelope := ParseNegMessage(data)
		if envelope == nil {
			return
		}
		switch env := envelope.(type) {
		case *OpenEnvelope, *CloseEnvelope:
			result <- fmt.Errorf("unexpected %s received from relay", env.Label())
			return
		case *ErrorEnvelope:
			result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason)
			return
		case *MessageEnvelope:
			nextmsg, err := neg.Reconcile(env.Message)
			if err != nil {
				result <- fmt.Errorf("failed to reconcile: %w", err)
				return
			}

			if nextmsg != "" {
				msgb, _ := MessageEnvelope{id, nextmsg}.MarshalJSON()
				r.Write(msgb)
			}
		}
	}))
	if err != nil {
		return err
	}

	msg := neg.Start()
	open, _ := OpenEnvelope{id, filter, msg}.MarshalJSON()
	err = <-r.Write(open)
	if err != nil {
		return fmt.Errorf("failed to write to relay: %w", err)
	}

	defer func() {
		clse, _ := CloseEnvelope{id}.MarshalJSON()
		r.Write(clse)
	}()

	type direction struct {
		label  string
		items  chan string
		source nostr.RelayStore
		target nostr.RelayStore
	}

	wg := sync.WaitGroup{}
	pool := newidlistpool(50)
	for _, dir := range []direction{
		{"up", neg.Haves, store, r},
		{"down", neg.HaveNots, r, store},
	} {
		wg.Add(1)
		go func(dir direction) {
			defer wg.Done()

			seen := blobloom.NewOptimized(blobloom.Config{
				Capacity: 10000,
				FPRate:   0.01,
			})

			doSync := func(ids []string) {
				defer wg.Done()
				defer pool.giveback(ids)

				if len(ids) == 0 {
					return
				}
				evtch, err := dir.source.QueryEvents(ctx, nostr.Filter{IDs: ids})
				if err != nil {
					result <- fmt.Errorf("error querying source on %s: %w", dir.label, err)
					return
				}
				for evt := range evtch {
					dir.target.Publish(ctx, *evt)
				}
			}

			ids := pool.grab()
			for item := range dir.items {
				h := xxhash.Sum64([]byte(item))
				if seen.Has(h) {
					continue
				}

				seen.Add(h)
				ids = append(ids, item)
				if len(ids) == 50 {
					wg.Add(1)
					go doSync(ids)
					ids = pool.grab()
				}
			}
			wg.Add(1)
			doSync(ids)
		}(dir)
	}

	go func() {
		wg.Wait()
		result <- nil
	}()

	err = <-result
	if err != nil {
		return err
	}

	return nil
}