diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 81c2d6b0c..9a26149c9 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -74,6 +74,14 @@ func (c *TowerClient) genSessionFilter( } } +// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter +// function that will filter out any sessions that have been exhausted. +func ExhaustedSessionFilter() wtdb.ClientSessionFilterFn { + return func(session *wtdb.ClientSession) bool { + return session.SeqNum < session.Policy.MaxUpdates + } +} + // RegisteredTower encompasses information about a registered watchtower with // the client. type RegisteredTower struct { @@ -414,6 +422,7 @@ func New(config *Config) (*TowerClient, error) { wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)), wtdb.WithPerMaxHeight(perMaxHeight), wtdb.WithPerCommittedUpdate(perCommittedUpdate), + wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()), ) if err != nil { return nil, err @@ -1627,6 +1636,7 @@ func (c *TowerClient) handleNewTower(msg *newTowerMsg) error { sessions, err := getClientSessions( c.cfg.DB, c.cfg.SecretKeyRing, &tower.ID, wtdb.WithPreEvalFilterFn(c.genSessionFilter(true)), + wtdb.WithPostEvalFilterFn(ExhaustedSessionFilter()), ) if err != nil { return fmt.Errorf("unable to determine sessions for tower %x: "+