go-nostr/sdk/wot.go
2025-04-04 14:34:16 -03:00

168 lines
3.4 KiB
Go

package sdk
import (
"context"
"strconv"
"sync"
"time"
"github.com/FastFilter/xorfilter"
"golang.org/x/sync/errgroup"
)
func PubKeyToShid(pubkey string) uint64 {
shid, _ := strconv.ParseUint(pubkey[32:48], 16, 64)
return shid
}
type wotCall struct {
id uint64 // basically the pubkey we're targeting here
mutex sync.Mutex
resultbacks []chan WotXorFilter // all callers waiting for results
errorbacks []chan error // all callers waiting for errors
done chan struct{} // this is closed when this call is fully resolved and deleted
}
const wotCallsSize = 8
var (
wotCallsMutex sync.Mutex
wotCallsInPlace [wotCallsSize]*wotCall
)
func (sys *System) LoadWoTFilter(ctx context.Context, pubkey string) (WotXorFilter, error) {
id := PubKeyToShid(pubkey)
pos := int(id % wotCallsSize)
start:
wotCallsMutex.Lock()
wc := wotCallsInPlace[pos]
if wc == nil {
// we are the first to call at this position
wc = &wotCall{
id: id,
resultbacks: make([]chan WotXorFilter, 0),
errorbacks: make([]chan error, 0),
done: make(chan struct{}),
}
wotCallsInPlace[pos] = wc
wotCallsMutex.Unlock()
goto actualcall
} else {
wotCallsMutex.Unlock()
}
wc.mutex.Lock()
if wc.id == id {
// there is already a call for this exact pubkey ongoing, so we just wait
resch := make(chan WotXorFilter)
errch := make(chan error)
wc.resultbacks = append(wc.resultbacks, resch)
wc.errorbacks = append(wc.errorbacks, errch)
wc.mutex.Unlock()
select {
case res := <-resch:
return res, nil
case err := <-errch:
return WotXorFilter{}, err
}
} else {
wc.mutex.Unlock()
// there is already a call in this place, but it's for a different pubkey, so wait
<-wc.done
// when it's done restart
goto start
}
actualcall:
var res WotXorFilter
m, err := sys.loadWoT(ctx, pubkey)
if err != nil {
wc.mutex.Lock()
for _, ch := range wc.errorbacks {
ch <- err
}
} else {
res = makeWoTFilter(m)
wc.mutex.Lock()
for _, ch := range wc.resultbacks {
ch <- res
}
}
wotCallsMutex.Lock()
wotCallsInPlace[pos] = nil
wc.mutex.Unlock()
close(wc.done)
wotCallsMutex.Unlock()
return res, err
}
func (sys *System) loadWoT(ctx context.Context, pubkey string) (chan string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(45)
res := make(chan string)
// process follow lists
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for _, f := range sys.FetchFollowList(ctx, pubkey).Items {
wg.Add(1)
g.Go(func() error {
res <- f.Pubkey
ctx, cancel := context.WithTimeout(ctx, time.Second*7)
defer cancel()
ff := sys.FetchFollowList(ctx, f.Pubkey).Items
for _, f2 := range ff {
res <- f2.Pubkey
}
wg.Done()
return nil
})
}
wg.Done()
}()
go func() {
wg.Wait()
close(res)
}()
return res, nil
}
func makeWoTFilter(m chan string) WotXorFilter {
shids := make([]uint64, 0, 60000)
shidMap := make(map[uint64]struct{}, 60000)
for pk := range m {
shid := PubKeyToShid(pk)
if _, alreadyAdded := shidMap[shid]; !alreadyAdded {
shidMap[shid] = struct{}{}
shids = append(shids, shid)
}
}
xf, _ := xorfilter.Populate(shids)
return WotXorFilter{len(shids), *xf}
}
type WotXorFilter struct {
Items int
xorfilter.Xor8
}
func (wxf WotXorFilter) Contains(pubkey string) bool {
if wxf.Items == 0 {
return false
}
return wxf.Xor8.Contains(PubKeyToShid(pubkey))
}