mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-05-05 00:00:14 +02:00
168 lines
3.4 KiB
Go
168 lines
3.4 KiB
Go
package sdk
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/FastFilter/xorfilter"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func PubKeyToShid(pubkey string) uint64 {
|
|
shid, _ := strconv.ParseUint(pubkey[32:48], 16, 64)
|
|
return shid
|
|
}
|
|
|
|
type wotCall struct {
|
|
id uint64 // basically the pubkey we're targeting here
|
|
mutex sync.Mutex
|
|
resultbacks []chan WotXorFilter // all callers waiting for results
|
|
errorbacks []chan error // all callers waiting for errors
|
|
done chan struct{} // this is closed when this call is fully resolved and deleted
|
|
}
|
|
|
|
const wotCallsSize = 8
|
|
|
|
var (
|
|
wotCallsMutex sync.Mutex
|
|
wotCallsInPlace [wotCallsSize]*wotCall
|
|
)
|
|
|
|
func (sys *System) LoadWoTFilter(ctx context.Context, pubkey string) (WotXorFilter, error) {
|
|
id := PubKeyToShid(pubkey)
|
|
pos := int(id % wotCallsSize)
|
|
|
|
start:
|
|
wotCallsMutex.Lock()
|
|
wc := wotCallsInPlace[pos]
|
|
if wc == nil {
|
|
// we are the first to call at this position
|
|
wc = &wotCall{
|
|
id: id,
|
|
resultbacks: make([]chan WotXorFilter, 0),
|
|
errorbacks: make([]chan error, 0),
|
|
done: make(chan struct{}),
|
|
}
|
|
wotCallsInPlace[pos] = wc
|
|
wotCallsMutex.Unlock()
|
|
goto actualcall
|
|
} else {
|
|
wotCallsMutex.Unlock()
|
|
}
|
|
|
|
wc.mutex.Lock()
|
|
if wc.id == id {
|
|
// there is already a call for this exact pubkey ongoing, so we just wait
|
|
resch := make(chan WotXorFilter)
|
|
errch := make(chan error)
|
|
wc.resultbacks = append(wc.resultbacks, resch)
|
|
wc.errorbacks = append(wc.errorbacks, errch)
|
|
wc.mutex.Unlock()
|
|
select {
|
|
case res := <-resch:
|
|
return res, nil
|
|
case err := <-errch:
|
|
return WotXorFilter{}, err
|
|
}
|
|
} else {
|
|
wc.mutex.Unlock()
|
|
// there is already a call in this place, but it's for a different pubkey, so wait
|
|
<-wc.done
|
|
// when it's done restart
|
|
goto start
|
|
}
|
|
|
|
actualcall:
|
|
var res WotXorFilter
|
|
m, err := sys.loadWoT(ctx, pubkey)
|
|
if err != nil {
|
|
wc.mutex.Lock()
|
|
for _, ch := range wc.errorbacks {
|
|
ch <- err
|
|
}
|
|
} else {
|
|
res = makeWoTFilter(m)
|
|
wc.mutex.Lock()
|
|
for _, ch := range wc.resultbacks {
|
|
ch <- res
|
|
}
|
|
}
|
|
|
|
wotCallsMutex.Lock()
|
|
wotCallsInPlace[pos] = nil
|
|
wc.mutex.Unlock()
|
|
close(wc.done)
|
|
wotCallsMutex.Unlock()
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (sys *System) loadWoT(ctx context.Context, pubkey string) (chan string, error) {
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
g.SetLimit(45)
|
|
|
|
res := make(chan string)
|
|
|
|
// process follow lists
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
for _, f := range sys.FetchFollowList(ctx, pubkey).Items {
|
|
wg.Add(1)
|
|
|
|
g.Go(func() error {
|
|
res <- f.Pubkey
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second*7)
|
|
defer cancel()
|
|
|
|
ff := sys.FetchFollowList(ctx, f.Pubkey).Items
|
|
for _, f2 := range ff {
|
|
res <- f2.Pubkey
|
|
}
|
|
wg.Done()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
wg.Done()
|
|
}()
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(res)
|
|
}()
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func makeWoTFilter(m chan string) WotXorFilter {
|
|
shids := make([]uint64, 0, 60000)
|
|
shidMap := make(map[uint64]struct{}, 60000)
|
|
for pk := range m {
|
|
shid := PubKeyToShid(pk)
|
|
if _, alreadyAdded := shidMap[shid]; !alreadyAdded {
|
|
shidMap[shid] = struct{}{}
|
|
shids = append(shids, shid)
|
|
}
|
|
}
|
|
|
|
xf, _ := xorfilter.Populate(shids)
|
|
return WotXorFilter{len(shids), *xf}
|
|
}
|
|
|
|
type WotXorFilter struct {
|
|
Items int
|
|
xorfilter.Xor8
|
|
}
|
|
|
|
func (wxf WotXorFilter) Contains(pubkey string) bool {
|
|
if wxf.Items == 0 {
|
|
return false
|
|
}
|
|
return wxf.Xor8.Contains(PubKeyToShid(pubkey))
|
|
}
|