Merge pull request #7702 from ellemouton/towerClientMux

wtclient: Tower Client Multiplexer
This commit is contained in:
Elle
2023-12-05 12:27:05 +02:00
committed by GitHub
14 changed files with 1265 additions and 1015 deletions

View File

@@ -15,13 +15,9 @@ type Config struct {
// Active indicates if the watchtower client is enabled.
Active bool
// Client is the backing watchtower client that we'll interact with
// through the watchtower RPC subserver.
Client wtclient.Client
// AnchorClient is the backing watchtower client for anchor channels that
// we'll interact through the watchtower RPC subserver.
AnchorClient wtclient.Client
// ClientMgr is a tower client manager that manages a set of tower
// clients.
ClientMgr wtclient.ClientManager
// Resolver is a custom resolver that will be used to resolve watchtower
// addresses to ensure we don't leak any information when running over

View File

@@ -16,9 +16,9 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower"
"github.com/lightningnetwork/lnd/watchtower/blob"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
@@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context,
Address: addr,
}
// TODO(conner): make atomic via multiplexed client
if err := c.cfg.Client.AddTower(towerAddr); err != nil {
return nil, err
}
if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil {
if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil {
return nil, err
}
@@ -247,12 +243,7 @@ func (c *WatchtowerClient) RemoveTower(ctx context.Context,
}
}
// TODO(conner): make atomic via multiplexed client
err = c.cfg.Client.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
err = c.cfg.AnchorClient.RemoveTower(pubKey, addr)
err = c.cfg.ClientMgr.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
@@ -272,23 +263,7 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)
anchorTowers, err := c.cfg.AnchorClient.RegisteredTowers(opts...)
if err != nil {
return nil, err
}
// Collect all the anchor client towers.
rpcTowers := make(map[wtdb.TowerID]*Tower)
for _, tower := range anchorTowers {
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)
rpcTowers[tower.ID] = rpcTower
}
legacyTowers, err := c.cfg.Client.RegisteredTowers(opts...)
towersPerBlobType, err := c.cfg.ClientMgr.RegisteredTowers(opts...)
if err != nil {
return nil, err
}
@@ -296,20 +271,32 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context,
// Collect all the legacy client towers. If it has any of the same
// towers that the anchors client has, then just add the session info
// for the legacy client to the existing tower.
for _, tower := range legacyTowers {
rpcTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)
t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
rpcTowers := make(map[wtdb.TowerID]*Tower)
for blobType, towers := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}
t.SessionInfo = append(t.SessionInfo, rpcTower.SessionInfo...)
t.Sessions = append(t.Sessions, rpcTower.Sessions...)
for _, tower := range towers {
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)
t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
}
t.SessionInfo = append(
t.SessionInfo, rpcTower.SessionInfo...,
)
t.Sessions = append(
t.Sessions, rpcTower.Sessions...,
)
}
}
towers := make([]*Tower, 0, len(rpcTowers))
@@ -337,40 +324,42 @@ func (c *WatchtowerClient) GetTowerInfo(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)
// Get the tower and its sessions from anchors client.
tower, err := c.cfg.AnchorClient.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)
// Get the tower and its sessions from legacy client.
tower, err = c.cfg.Client.LookupTower(pubKey, opts...)
towersPerBlobType, err := c.cfg.ClientMgr.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
rpcLegacyTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)
var resTower *Tower
for blobType, tower := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}
if !bytes.Equal(rpcTower.Pubkey, rpcLegacyTower.Pubkey) {
return nil, fmt.Errorf("legacy and anchor clients returned " +
"inconsistent results for the given tower")
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)
if resTower == nil {
resTower = rpcTower
continue
}
if !bytes.Equal(rpcTower.Pubkey, resTower.Pubkey) {
return nil, fmt.Errorf("tower clients returned " +
"inconsistent results for the given tower")
}
resTower.SessionInfo = append(
resTower.SessionInfo, rpcTower.SessionInfo...,
)
resTower.Sessions = append(
resTower.Sessions, rpcTower.Sessions...,
)
}
rpcTower.SessionInfo = append(
rpcTower.SessionInfo, rpcLegacyTower.SessionInfo...,
)
rpcTower.Sessions = append(
rpcTower.Sessions, rpcLegacyTower.Sessions...,
)
return rpcTower, nil
return resTower, nil
}
// constructFunctionalOptions is a helper function that constructs a list of
@@ -422,30 +411,14 @@ func constructFunctionalOptions(includeSessions,
}
// Stats returns the in-memory statistics of the client since startup.
func (c *WatchtowerClient) Stats(ctx context.Context,
req *StatsRequest) (*StatsResponse, error) {
func (c *WatchtowerClient) Stats(_ context.Context,
_ *StatsRequest) (*StatsResponse, error) {
if err := c.isActive(); err != nil {
return nil, err
}
clientStats := []wtclient.ClientStats{
c.cfg.Client.Stats(),
c.cfg.AnchorClient.Stats(),
}
var stats wtclient.ClientStats
for i := range clientStats {
// Grab a reference to the slice index rather than copying bc
// ClientStats contains a lock which cannot be copied by value.
stat := &clientStats[i]
stats.NumTasksAccepted += stat.NumTasksAccepted
stats.NumTasksIneligible += stat.NumTasksIneligible
stats.NumTasksPending += stat.NumTasksPending
stats.NumSessionsAcquired += stat.NumSessionsAcquired
stats.NumSessionsExhausted += stat.NumSessionsExhausted
}
stats := c.cfg.ClientMgr.Stats()
return &StatsResponse{
NumBackups: uint32(stats.NumTasksAccepted),
@@ -464,17 +437,22 @@ func (c *WatchtowerClient) Policy(ctx context.Context,
return nil, err
}
var policy wtpolicy.Policy
var blobType blob.Type
switch req.PolicyType {
case PolicyType_LEGACY:
policy = c.cfg.Client.Policy()
blobType = blob.TypeAltruistCommit
case PolicyType_ANCHOR:
policy = c.cfg.AnchorClient.Policy()
blobType = blob.TypeAltruistAnchorCommit
default:
return nil, fmt.Errorf("unknown policy type: %v",
req.PolicyType)
}
policy, err := c.cfg.ClientMgr.Policy(blobType)
if err != nil {
return nil, err
}
return &PolicyResponse{
MaxUpdates: uint32(policy.MaxUpdates),
SweepSatPerVbyte: uint32(policy.SweepFeeRate.FeePerVByte()),