diff --git a/channeldb/db.go b/channeldb/db.go index 51f3fe9aa..4b9476c37 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -268,6 +268,9 @@ func createChannelDB(dbPath string) error { if _, err := edges.CreateBucket(channelPointBucket); err != nil { return err } + if _, err := edges.CreateBucket(zombieBucket); err != nil { + return err + } graphMeta, err := tx.CreateBucket(graphMetaBucket) if err != nil { diff --git a/channeldb/error.go b/channeldb/error.go index c05481063..e0e754522 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -1,6 +1,9 @@ package channeldb -import "fmt" +import ( + "errors" + "fmt" +) var ( // ErrNoChanDBExists is returned when a channel bucket hasn't been @@ -79,6 +82,10 @@ var ( // can't be found. ErrEdgeNotFound = fmt.Errorf("edge not found") + // ErrZombieEdge is an error returned when we attempt to look up an edge + // but it is marked as a zombie within the zombie index. + ErrZombieEdge = errors.New("edge marked as zombie") + // ErrEdgeAlreadyExist is returned when edge with specific // channel id can't be added because it already exist. ErrEdgeAlreadyExist = fmt.Errorf("edge already exist") diff --git a/channeldb/graph.go b/channeldb/graph.go index 2d924d94f..11a875fc9 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -106,6 +106,17 @@ var ( // maps: outPoint -> chanID channelPointBucket = []byte("chan-index") + // zombieBucket is a sub-bucket of the main edgeBucket bucket + // responsible for maintaining an index of zombie channels. Each entry + // exists within the bucket as follows: + // + // maps: chanID -> pubKey1 || pubKey2 + // + // The chanID represents the channel ID of the edge that is marked as a + // zombie and is used as the key, which maps to the public keys of the + // edge's participants. + zombieBucket = []byte("zombie-index") + // graphMetaBucket is a top-level bucket which stores various meta-deta // related to the on-disk channel graph. Data stored in this bucket // includes the block to which the graph has been synced to, the total @@ -123,9 +134,6 @@ var ( // case we'll remove all entries from the prune log with a block height // that no longer exists. pruneLogBucket = []byte("prune-log") - - edgeBloomKey = []byte("edge-bloom") - nodeBloomKey = []byte("node-bloom") ) const ( @@ -587,17 +595,20 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error // HasChannelEdge returns true if the database knows of a channel edge with the // passed channel ID, and false otherwise. If an edge with that ID is found // within the graph, then two time stamps representing the last time the edge -// was updated for both directed edges are returned along with the boolean. -func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) { - // TODO(roasbeef): check internal bloom filter first +// was updated for both directed edges are returned along with the boolean. If +// it is not found, then the zombie index is checked and its result is returned +// as the second boolean. +func (c *ChannelGraph) HasChannelEdge(chanID uint64, +) (time.Time, time.Time, bool, bool, error) { var ( node1UpdateTime time.Time node2UpdateTime time.Time exists bool + isZombie bool ) - if err := c.db.View(func(tx *bbolt.Tx) error { + err := c.db.View(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound @@ -609,12 +620,21 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool var channelID [8]byte byteOrder.PutUint64(channelID[:], chanID) + + // If the edge doesn't exist, then we'll also check our zombie + // index. if edgeIndex.Get(channelID[:]) == nil { exists = false + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex != nil { + isZombie, _, _ = isZombieEdge(zombieIndex, chanID) + } + return nil } exists = true + isZombie = false // If the channel has been found in the graph, then retrieve // the edges itself so we can return the last updated @@ -640,11 +660,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool } return nil - }); err != nil { - return time.Time{}, time.Time{}, exists, err - } + }) - return node1UpdateTime, node2UpdateTime, exists, nil + return node1UpdateTime, node2UpdateTime, exists, isZombie, err } // UpdateChannelEdge retrieves and update edge of the graph database. Method @@ -720,6 +738,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, if nodes == nil { return ErrSourceNodeNotSet } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } // For each of the outpoints that have been spent within the // block, we attempt to delete them from the graph as if that @@ -753,7 +775,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // a channel. If no error is returned, then a channel // was successfully pruned. err = delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, chanPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + chanPoint, false, ) if err != nil && err != ErrEdgeNotFound { return err @@ -963,6 +986,10 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf if err != nil { return err } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } nodes, err := tx.CreateBucketIfNotExists(nodeBucket) if err != nil { return err @@ -980,7 +1007,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf return err } err = delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, &edgeInfo.ChannelPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + &edgeInfo.ChannelPoint, false, ) if err != nil && err != ErrEdgeNotFound { return err @@ -1067,8 +1095,9 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { } // DeleteChannelEdge removes an edge from the database as identified by its -// funding outpoint. If the edge does not exist within the database, then -// ErrEdgeNotFound will be returned. +// funding outpoint and also marks it as a zombie. This ensures that we're +// unable to re-add this to our database once again. If the edge does not exist +// within the database, then ErrEdgeNotFound will be returned. func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { // TODO(roasbeef): possibly delete from node bucket if node has no more // channels @@ -1088,19 +1117,22 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { if edgeIndex == nil { return ErrEdgeNotFound } - chanIndex := edges.Bucket(channelPointBucket) if chanIndex == nil { return ErrEdgeNotFound } - nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodeNotFound } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } return delChannelByEdge( - edges, edgeIndex, chanIndex, nodes, chanPoint, + edges, edgeIndex, chanIndex, zombieIndex, nodes, + chanPoint, true, ) }) } @@ -1571,8 +1603,9 @@ func delEdgeUpdateIndexEntry(edgesBucket *bbolt.Bucket, chanID uint64, return nil } -func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, - chanIndex *bbolt.Bucket, nodes *bbolt.Bucket, chanPoint *wire.OutPoint) error { +func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex, + nodes *bbolt.Bucket, chanPoint *wire.OutPoint, isZombie bool) error { + var b bytes.Buffer if err := writeOutpoint(&b, chanPoint); err != nil { return err @@ -1630,12 +1663,29 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, } } - // Finally, with the edge data deleted, we can purge the information - // from the two edge indexes. + // With the edge data deleted, we can purge the information from the two + // edge indexes. if err := edgeIndex.Delete(chanID); err != nil { return err } - return chanIndex.Delete(b.Bytes()) + if err := chanIndex.Delete(b.Bytes()); err != nil { + return err + } + + // Finally, we'll mark the edge as a zombie within our index if it's + // being removed due to the channel becoming a zombie. We do this to + // ensure we don't store unnecessary data for spent channels. + if !isZombie { + return nil + } + + var pubKey1, pubKey2 [33]byte + copy(pubKey1[:], nodeKeys[:33]) + copy(pubKey2[:], nodeKeys[33:]) + + return markEdgeZombie( + zombieIndex, byteOrder.Uint64(chanID), pubKey1, pubKey2, + ) } // UpdateEdgePolicy updates the edge routing policy for a single directed edge @@ -2497,7 +2547,8 @@ func (c *ChannelEdgePolicy) Signature() (*btcec.Signature, error) { // found, then ErrEdgeNotFound is returned. A struct which houses the general // information for the channel itself is returned as well as two structs that // contain the routing policies for the channel in either direction. -func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { +func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint, +) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { var ( edgeInfo *ChannelEdgeInfo @@ -2575,7 +2626,12 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE // ErrEdgeNotFound is returned. A struct which houses the general information // for the channel itself is returned as well as two structs that contain the // routing policies for the channel in either direction. -func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { +// +// ErrZombieEdge an be returned if the edge is currently marked as a zombie +// within the database. In this case, the ChannelEdgePolicy's will be nil, and +// the ChannelEdgeInfo will only include the public keys of each node. +func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64, +) (*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy, error) { var ( edgeInfo *ChannelEdgeInfo @@ -2606,13 +2662,48 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * byteOrder.PutUint64(channelID[:], chanID) + // Now, attempt to fetch edge. edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:]) + + // If it doesn't exist, we'll quickly check our zombie index to + // see if we've previously marked it as so. + if err == ErrEdgeNotFound { + // If the zombie index doesn't exist, or the edge is not + // marked as a zombie within it, then we'll return the + // original ErrEdgeNotFound error. + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return ErrEdgeNotFound + } + + isZombie, pubKey1, pubKey2 := isZombieEdge( + zombieIndex, chanID, + ) + if !isZombie { + return ErrEdgeNotFound + } + + // Otherwise, the edge is marked as a zombie, so we'll + // populate the edge info with the public keys of each + // party as this is the only information we have about + // it and return an error signaling so. + edgeInfo = &ChannelEdgeInfo{ + NodeKey1Bytes: pubKey1, + NodeKey2Bytes: pubKey2, + } + return ErrZombieEdge + } + + // Otherwise, we'll just return the error if any. if err != nil { return err } + edgeInfo = &edge edgeInfo.db = c.db + // Then we'll attempt to fetch the accompanying policies of this + // edge. e1, e2, err := fetchChanEdgePolicies( edgeIndex, edges, nodes, channelID[:], c.db, ) @@ -2624,6 +2715,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * policy2 = e2 return nil }) + if err == ErrZombieEdge { + return edgeInfo, nil, nil, err + } if err != nil { return nil, nil, nil, err } @@ -2785,6 +2879,109 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy { return &ChannelEdgePolicy{db: c.db} } +// MarkEdgeZombie marks an edge as a zombie within the graph's zombie index. +// The public keys should represent the node public keys of the two parties +// involved in the edge. +func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + return c.db.Batch(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } + return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2) + }) +} + +// markEdgeZombie marks an edge as a zombie within our zombie index. The public +// keys should represent the node public keys of the two parties involved in the +// edge. +func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + + var v [66]byte + copy(v[:33], pubKey1[:]) + copy(v[33:], pubKey2[:]) + + return zombieIndex.Put(k[:], v[:]) +} + +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { + return c.db.Batch(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return nil + } + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + return zombieIndex.Delete(k[:]) + }) +} + +// IsZombieEdge returns whether the edge is considered zombie. If it is a +// zombie, then the two node public keys corresponding to this edge are also +// returned. +func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) { + var ( + isZombie bool + pubKey1, pubKey2 [33]byte + ) + + err := c.db.View(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex := edges.Bucket(zombieBucket) + if zombieIndex == nil { + return nil + } + + isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID) + return nil + }) + if err != nil { + return false, [33]byte{}, [33]byte{} + } + + return isZombie, pubKey1, pubKey2 +} + +// isZombieEdge returns whether an entry exists for the given channel in the +// zombie index. If an entry exists, then the two node public keys corresponding +// to this edge are also returned. +func isZombieEdge(zombieIndex *bbolt.Bucket, + chanID uint64) (bool, [33]byte, [33]byte) { + + var k [8]byte + byteOrder.PutUint64(k[:], chanID) + + v := zombieIndex.Get(k[:]) + if v == nil { + return false, [33]byte{}, [33]byte{} + } + + var pubKey1, pubKey2 [33]byte + copy(pubKey1[:], v[:33]) + copy(pubKey2[:], v[33:]) + + return true, pubKey1, pubKey2 +} + func putLightningNode(nodeBucket *bbolt.Bucket, aliasBucket *bbolt.Bucket, updateIndex *bbolt.Bucket, node *LightningNode) error { diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 378391ec6..f8df64487 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -374,6 +374,10 @@ func TestEdgeInsertionDeletion(t *testing.T) { if _, _, _, err := graph.FetchChannelEdgesByID(chanID); err == nil { t.Fatalf("channel edge not deleted") } + isZombie, _, _ := graph.IsZombieEdge(chanID) + if !isZombie { + t.Fatal("channel edge not marked as zombie") + } // Finally, attempt to delete a (now) non-existent edge within the // database, this should result in an error. @@ -522,29 +526,38 @@ func TestDisconnectBlockAtHeight(t *testing.T) { } // The two first edges should be removed from the db. - _, _, has, err := graph.HasChannelEdge(edgeInfo.ChannelID) + _, _, has, isZombie, err := graph.HasChannelEdge(edgeInfo.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if has { t.Fatalf("edge1 was not pruned from the graph") } - _, _, has, err = graph.HasChannelEdge(edgeInfo2.ChannelID) + if isZombie { + t.Fatal("reorged edge1 should not be marked as zombie") + } + _, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo2.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if has { t.Fatalf("edge2 was not pruned from the graph") } + if isZombie { + t.Fatal("reorged edge2 should not be marked as zombie") + } // Edge 3 should not be removed. - _, _, has, err = graph.HasChannelEdge(edgeInfo3.ChannelID) + _, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo3.ChannelID) if err != nil { t.Fatalf("unable to query for edge: %v", err) } if !has { t.Fatalf("edge3 was pruned from the graph") } + if isZombie { + t.Fatal("edge3 was marked as zombie") + } // PruneTip should be set to the blockHash we specified for the block // at height 155. @@ -755,12 +768,16 @@ func TestEdgeInfoUpdates(t *testing.T) { // Check for existence of the edge within the database, it should be // found. - _, _, found, err := graph.HasChannelEdge(chanID) + _, _, found, isZombie, err := graph.HasChannelEdge(chanID) if err != nil { t.Fatalf("unable to query for edge: %v", err) - } else if !found { + } + if !found { t.Fatalf("graph should have of inserted edge") } + if isZombie { + t.Fatal("live edge should not be marked as zombie") + } // We should also be able to retrieve the channelID only knowing the // channel point of the channel. @@ -2786,6 +2803,67 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { assertEdgeInfoEqual(t, dbEdgeInfo, edgeInfo) } +// TestGraphZombieIndex ensures that we can mark edges correctly as zombie/live. +func TestGraphZombieIndex(t *testing.T) { + t.Parallel() + + // We'll start by creating our test graph along with a test edge. + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to create test database: %v", err) + } + graph := db.ChannelGraph() + + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test vertex: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test vertex: %v", err) + } + edge, _, _ := createChannelEdge(db, node1, node2) + + // If the graph is not aware of the edge, then it should not be a + // zombie. + isZombie, _, _ := graph.IsZombieEdge(edge.ChannelID) + if isZombie { + t.Fatal("expected edge to not be marked as zombie") + } + + // If we mark the edge as a zombie, then we should expect to see it + // within the index. + err = graph.MarkEdgeZombie( + edge.ChannelID, node1.PubKeyBytes, node2.PubKeyBytes, + ) + if err != nil { + t.Fatalf("unable to mark edge as zombie: %v", err) + } + isZombie, pubKey1, pubKey2 := graph.IsZombieEdge(edge.ChannelID) + if !isZombie { + t.Fatal("expected edge to be marked as zombie") + } + if pubKey1 != node1.PubKeyBytes { + t.Fatalf("expected pubKey1 %x, got %x", node1.PubKeyBytes, + pubKey1) + } + if pubKey2 != node2.PubKeyBytes { + t.Fatalf("expected pubKey2 %x, got %x", node2.PubKeyBytes, + pubKey2) + } + + // Similarly, if we mark the same edge as live, we should no longer see + // it within the index. + if err := graph.MarkEdgeLive(edge.ChannelID); err != nil { + t.Fatalf("unable to mark edge as live: %v", err) + } + isZombie, _, _ = graph.IsZombieEdge(edge.ChannelID) + if isZombie { + t.Fatal("expected edge to not be marked as zombie") + } +} + // compareNodes is used to compare two LightningNodes while excluding the // Features struct, which cannot be compared as the semantics for reserializing // the featuresMap have not been defined. diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e35a19299..171b04aba 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1571,8 +1571,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // At this point, we'll now ask the router if this is a stale - // update. If so we can skip all the processing below. + // At this point, we'll now ask the router if this is a + // zombie/known edge. If so we can skip all the processing + // below. if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { nMsg.err <- nil return nil @@ -1787,13 +1788,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } // Before we perform any of the expensive checks below, we'll - // make sure that the router doesn't already have a fresher - // announcement for this edge. + // check whether this update is stale or is for a zombie + // channel in order to quickly reject it. timestamp := time.Unix(int64(msg.Timestamp), 0) if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { - nMsg.err <- nil return nil } @@ -1809,56 +1809,99 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - if err != nil { - switch err { - case channeldb.ErrGraphNotFound: - fallthrough - case channeldb.ErrGraphNoEdgesFound: - fallthrough - case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this - // ChannelUpdate was not found in the graph, - // this might be a channel in the process of - // being opened, and we haven't processed our - // own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets - // resolved after the channel proofs are - // exchanged and the channel is broadcasted to - // the rest of the network, but in case this - // is a private channel this won't ever happen. - // Because of this, we temporarily add it to a - // map, and reprocess it after our own - // ChannelAnnouncement has been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], - nMsg, - ) - d.pChanUpdMtx.Unlock() + switch err { + // No error, break. + case nil: + break - log.Debugf("Got ChannelUpdate for edge not "+ - "found in graph(shortChanID=%v), "+ - "saving for reprocessing later", - shortChanID) + case channeldb.ErrZombieEdge: + // Since we've deemed the update as not stale above, + // before marking it live, we'll make sure it has been + // signed by the correct party. The least-significant + // bit in the flag on the channel update tells us which + // edge is being updated. + var pubKey *btcec.PublicKey + switch { + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: + pubKey, _ = chanInfo.NodeKey1() + case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: + pubKey, _ = chanInfo.NodeKey2() + } - // NOTE: We don't return anything on the error - // channel for this message, as we expect that - // will be done when this ChannelUpdate is - // later reprocessed. - return nil - - default: - err := fmt.Errorf("unable to validate "+ - "channel update short_chan_id=%v: %v", - shortChanID, err) + err := routing.VerifyChannelUpdateSignature(msg, pubKey) + if err != nil { + err := fmt.Errorf("unable to verify channel "+ + "update signature: %v", err) log.Error(err) nMsg.err <- err - - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() return nil } + + // With the signature valid, we'll proceed to mark the + // edge as live and wait for the channel announcement to + // come through again. + err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) + if err != nil { + err := fmt.Errorf("unable to remove edge with "+ + "chan_id=%v from zombie index: %v", + msg.ShortChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + + log.Debugf("Removed edge with chan_id=%v from zombie "+ + "index", msg.ShortChannelID) + + // We'll fallthrough to ensure we stash the update until + // we receive its corresponding ChannelAnnouncement. + // This is needed to ensure the edge exists in the graph + // before applying the update. + fallthrough + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this ChannelUpdate was + // not found in the graph, this might be a channel in + // the process of being opened, and we haven't processed + // our own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets resolved after + // the channel proofs are exchanged and the channel is + // broadcasted to the rest of the network, but in case + // this is a private channel this won't ever happen. + // This can also happen in the case of a zombie channel + // with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because + // of this, we temporarily add it to a map, and + // reprocess it after our own ChannelAnnouncement has + // been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], nMsg, + ) + d.pChanUpdMtx.Unlock() + + log.Debugf("Got ChannelUpdate for edge not found in "+ + "graph(shortChanID=%v), saving for "+ + "reprocessing later", shortChanID) + + // NOTE: We don't return anything on the error channel + // for this message, as we expect that will be done when + // this ChannelUpdate is later reprocessed. + return nil + + default: + err := fmt.Errorf("unable to validate channel update "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + + d.rejectMtx.Lock() + d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} + d.rejectMtx.Unlock() + return nil } // The least-significant bit in the flag on the channel update diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 560fdbb1a..8b8a3917e 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -110,10 +110,11 @@ func (n *mockSigner) SignMessage(pubKey *btcec.PublicKey, type mockGraphSource struct { bestHeight uint32 - mu sync.Mutex - nodes []channeldb.LightningNode - infos map[uint64]channeldb.ChannelEdgeInfo - edges map[uint64][]channeldb.ChannelEdgePolicy + mu sync.Mutex + nodes []channeldb.LightningNode + infos map[uint64]channeldb.ChannelEdgeInfo + edges map[uint64][]channeldb.ChannelEdgePolicy + zombies map[uint64][][33]byte } func newMockRouter(height uint32) *mockGraphSource { @@ -121,6 +122,7 @@ func newMockRouter(height uint32) *mockGraphSource { bestHeight: height, infos: make(map[uint64]channeldb.ChannelEdgeInfo), edges: make(map[uint64][]channeldb.ChannelEdgePolicy), + zombies: make(map[uint64][][33]byte), } } @@ -205,9 +207,18 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( r.mu.Lock() defer r.mu.Unlock() - chanInfo, ok := r.infos[chanID.ToUint64()] + chanIDInt := chanID.ToUint64() + chanInfo, ok := r.infos[chanIDInt] if !ok { - return nil, nil, nil, channeldb.ErrEdgeNotFound + pubKeys, isZombie := r.zombies[chanIDInt] + if !isZombie { + return nil, nil, nil, channeldb.ErrEdgeNotFound + } + + return &channeldb.ChannelEdgeInfo{ + NodeKey1Bytes: pubKeys[0], + NodeKey2Bytes: pubKeys[1], + }, nil, nil, channeldb.ErrZombieEdge } edges := r.edges[chanID.ToUint64()] @@ -280,13 +291,15 @@ func (r *mockGraphSource) IsPublicNode(node routing.Vertex) (bool, error) { } // IsKnownEdge returns true if the graph source already knows of the passed -// channel ID. +// channel ID either as a live or zombie channel. func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { r.mu.Lock() defer r.mu.Unlock() - _, ok := r.infos[chanID.ToUint64()] - return ok + chanIDInt := chanID.ToUint64() + _, exists := r.infos[chanIDInt] + _, isZombie := r.zombies[chanIDInt] + return exists || isZombie } // IsStaleEdgePolicy returns true if the graph source has a channel edge for @@ -297,13 +310,23 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, r.mu.Lock() defer r.mu.Unlock() - edges, ok := r.edges[chanID.ToUint64()] + chanIDInt := chanID.ToUint64() + edges, ok := r.edges[chanIDInt] if !ok { - return false + // Since the edge doesn't exist, we'll check our zombie index as + // well. + _, isZombie := r.zombies[chanIDInt] + if !isZombie { + return false + } + + // Since it exists within our zombie index, we'll check that it + // respects the router's live edge horizon to determine whether + // it is stale or not. + return time.Since(timestamp) > routing.DefaultChannelPruneExpiry } switch { - case len(edges) >= 1 && edges[0].ChannelFlags == flags: return !edges[0].LastUpdate.Before(timestamp) @@ -315,6 +338,26 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, } } +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *mockGraphSource) MarkEdgeLive(chanID lnwire.ShortChannelID) error { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.zombies, chanID.ToUint64()) + return nil +} + +// MarkEdgeZombie marks an edge as a zombie within our zombie index. +func (r *mockGraphSource) MarkEdgeZombie(chanID lnwire.ShortChannelID, pubKey1, + pubKey2 [33]byte) error { + + r.mu.Lock() + defer r.mu.Unlock() + r.zombies[chanID.ToUint64()] = [][33]byte{pubKey1, pubKey2} + return nil +} + type mockNotifier struct { clientCounter uint32 epochClients map[uint32]chan *chainntnfs.BlockEpoch @@ -2158,6 +2201,259 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } +// TestRejectZombieEdge ensures that we properly reject any announcements for +// zombie edges. +func TestRejectZombieEdge(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()} + + // processAnnouncements is a helper closure we'll use to test that we + // properly process/reject announcements based on whether they're for a + // zombie edge or not. + processAnnouncements := func(isZombie bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteChanAnn, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject live channel "+ + "announcement with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "announcement: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel announcement") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel announcement") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "announcement") + } + } + + errChan = ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + select { + case err := <-errChan: + if isZombie && err != nil { + t.Fatalf("expected to reject zombie channel "+ + "update with nil error: %v", err) + } + if !isZombie && err != nil { + t.Fatalf("expected to process live channel "+ + "update: %v", err) + } + case <-time.After(time.Second): + t.Fatal("expected to process channel update") + } + select { + case <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel update") + } + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "update") + } + } + } + + // We'll mark the edge for which we'll process announcements for as a + // zombie within the router. This should reject any announcements for + // this edge while it remains as a zombie. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(true) + + // If we then mark the edge as live, the edge's zombie status should be + // overridden and the announcements should be processed. + if err := ctx.router.MarkEdgeLive(chanID); err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + processAnnouncements(false) +} + +// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge +// becomes live by receiving a fresh update. +func TestProcessZombieEdgeNowLive(t *testing.T) { + t.Parallel() + + // We'll start by creating our test context with a batch of + // announcements. + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("unable to create announcements: %v", err) + } + + localPrivKey := nodeKeyPriv1 + remotePrivKey := nodeKeyPriv2 + + remotePeer := &mockPeer{pk: remotePrivKey.PubKey()} + + // processAnnouncement is a helper closure we'll use to ensure an + // announcement is properly processed/rejected based on whether the edge + // is a zombie or not. The expectsErr boolean can be used to determine + // whether we should expect an error when processing the message, while + // the isZombie boolean can be used to determine whether the + // announcement should be or not be broadcast. + processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) { + t.Helper() + + errChan := ctx.gossiper.ProcessRemoteAnnouncement( + ann, remotePeer, + ) + + var err error + select { + case err = <-errChan: + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + if expectsErr && err == nil { + t.Fatal("expected error when processing announcement") + } + if !expectsErr && err != nil { + t.Fatalf("received unexpected error when processing "+ + "announcement: %v", err) + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + if isZombie { + t.Fatal("expected to not broadcast zombie " + + "channel message") + } + assertMessage(t, ann, msgWithSenders.msg) + + case <-time.After(2 * trickleDelay): + if !isZombie { + t.Fatal("expected to broadcast live channel " + + "message") + } + } + } + + // We'll generate a channel update with a timestamp far enough in the + // past to consider it a zombie. + zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry) + batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix()) + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We'll also add the edge to our zombie index. + chanID := batch.remoteChanAnn.ShortChannelID + err = ctx.router.MarkEdgeZombie( + chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2, + ) + if err != nil { + t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) + } + + // Attempting to process the current channel update should fail due to + // its edge being considered a zombie and its timestamp not being within + // the live horizon. We should not expect an error here since it is just + // a stale update. + processAnnouncement(batch.chanUpdAnn2, true, false) + + // Now we'll generate a new update with a fresh timestamp. This should + // allow the channel update to be processed even though it is still + // marked as a zombie within the index, since it is a fresh new update. + // This won't work however since we'll sign it with the wrong private + // key (local rather than remote). + batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix()) + if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // We should expect an error due to the signature being invalid. + processAnnouncement(batch.chanUpdAnn2, true, true) + + // Signing it with the correct private key should allow it to be + // processed. + if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil { + t.Fatalf("unable to sign update with new timestamp: %v", err) + } + + // The channel update cannot be successfully processed and broadcast + // until the channel announcement is. Since the channel update indicates + // a fresh new update, the gossiper should stash it until it sees the + // corresponding channel announcement. + updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, remotePeer, + ) + + select { + case <-ctx.broadcastedMessage: + t.Fatal("expected to not broadcast live channel update " + + "without announcement") + case <-time.After(2 * trickleDelay): + } + + // We'll go ahead and process the channel announcement to ensure the + // channel update is processed thereafter. + processAnnouncement(batch.remoteChanAnn, false, false) + + // After successfully processing the announcement, the channel update + // should have been processed and broadcast successfully as well. + select { + case err := <-updateErrChan: + if err != nil { + t.Fatalf("expected to process live channel update: %v", + err) + } + case <-time.After(time.Second): + t.Fatal("expected to process announcement") + } + + select { + case msgWithSenders := <-ctx.broadcastedMessage: + assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) + case <-time.After(2 * trickleDelay): + t.Fatal("expected to broadcast live channel update") + } +} + // TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate // from the remote before we have processed our own ChannelAnnouncement, it will // be reprocessed later, after our ChannelAnnouncement. diff --git a/routing/ann_validation.go b/routing/ann_validation.go index 4b304b5a9..184d7009c 100644 --- a/routing/ann_validation.go +++ b/routing/ann_validation.go @@ -2,6 +2,7 @@ package routing import ( "bytes" + "fmt" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -132,20 +133,28 @@ func ValidateChannelUpdateAnn(pubKey *btcec.PublicKey, capacity btcutil.Amount, return err } - data, err := a.DataToSign() + return VerifyChannelUpdateSignature(a, pubKey) +} + +// VerifyChannelUpdateSignature verifies that the channel update message was +// signed by the party with the given node public key. +func VerifyChannelUpdateSignature(msg *lnwire.ChannelUpdate, + pubKey *btcec.PublicKey) error { + + data, err := msg.DataToSign() if err != nil { - return errors.Errorf("unable to reconstruct message: %v", err) + return fmt.Errorf("unable to reconstruct message data: %v", err) } dataHash := chainhash.DoubleHashB(data) - nodeSig, err := a.Signature.ToSignature() + nodeSig, err := msg.Signature.ToSignature() if err != nil { return err } if !nodeSig.Verify(dataHash, pubKey) { - return errors.Errorf("invalid signature for channel "+ - "update %v", spew.Sdump(a)) + return fmt.Errorf("invalid signature for channel update %v", + spew.Sdump(msg)) } return nil diff --git a/routing/router.go b/routing/router.go index 7d836a890..1025a4023 100644 --- a/routing/router.go +++ b/routing/router.go @@ -33,6 +33,10 @@ const ( // if we should give up on a payment attempt. This will be used if a // value isn't specified in the LightningNode struct. defaultPayAttemptTimeout = time.Duration(time.Second * 60) + + // DefaultChannelPruneExpiry is the default duration used to determine + // if a channel should be pruned or not. + DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14) ) var ( @@ -76,7 +80,7 @@ type ChannelGraphSource interface { IsPublicNode(node Vertex) (bool, error) // IsKnownEdge returns true if the graph source already knows of the - // passed channel ID. + // passed channel ID either as a live or zombie edge. IsKnownEdge(chanID lnwire.ShortChannelID) bool // IsStaleEdgePolicy returns true if the graph source has a channel @@ -85,6 +89,10 @@ type ChannelGraphSource interface { IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool + // MarkEdgeLive clears an edge from our zombie index, deeming it as + // live. + MarkEdgeLive(chanID lnwire.ShortChannelID) error + // ForAllOutgoingChannels is used to iterate over all channels // emanating from the "source" node which is the center of the // star-graph. @@ -1009,12 +1017,19 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { // Prior to processing the announcement we first check if we // already know of this channel, if so, then we can exit early. - _, _, exists, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) + _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge( + msg.ChannelID, + ) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return errors.Errorf("unable to check for edge "+ "existence: %v", err) - } else if exists { - return newErrf(ErrIgnored, "Ignoring msg for known "+ + } + if isZombie { + return newErrf(ErrIgnored, "ignoring msg for zombie "+ + "chan_id=%v", msg.ChannelID) + } + if exists { + return newErrf(ErrIgnored, "ignoring msg for known "+ "chan_id=%v", msg.ChannelID) } @@ -1130,19 +1145,29 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { r.channelEdgeMtx.Lock(msg.ChannelID) defer r.channelEdgeMtx.Unlock(msg.ChannelID) - edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( - msg.ChannelID, - ) + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + r.cfg.Graph.HasChannelEdge(msg.ChannelID) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return errors.Errorf("unable to check for edge "+ "existence: %v", err) } + // If the channel is marked as a zombie in our database, and + // we consider this a stale update, then we should not apply the + // policy. + isStaleUpdate := time.Since(msg.LastUpdate) > r.cfg.ChannelPruneExpiry + if isZombie && isStaleUpdate { + return newErrf(ErrIgnored, "ignoring stale update "+ + "(flags=%v|%v) for zombie chan_id=%v", + msg.MessageFlags, msg.ChannelFlags, + msg.ChannelID) + } + // If the channel doesn't exist in our database, we cannot // apply the updated policy. if !exists { - return newErrf(ErrIgnored, "Ignoring update "+ + return newErrf(ErrIgnored, "ignoring update "+ "(flags=%v|%v) for unknown chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) @@ -2241,12 +2266,12 @@ func (r *ChannelRouter) IsPublicNode(node Vertex) (bool, error) { } // IsKnownEdge returns true if the graph source already knows of the passed -// channel ID. +// channel ID either as a live or zombie edge. // // NOTE: This method is part of the ChannelGraphSource interface. func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { - _, _, exists, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) - return exists + _, _, exists, isZombie, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) + return exists || isZombie } // IsStaleEdgePolicy returns true if the graph soruce has a channel edge for @@ -2256,14 +2281,19 @@ func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool { - edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( - chanID.ToUint64(), - ) + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) if err != nil { return false } + // If we know of the edge as a zombie, then we'll check the timestamp of + // this message to determine whether it's fresh. + if isZombie { + return time.Since(timestamp) > r.cfg.ChannelPruneExpiry + } + // If we don't know of the edge, then it means it's fresh (thus not // stale). if !exists { @@ -2275,7 +2305,6 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, // already have the most up to date information for that edge. If so, // then we can exit early. switch { - // A flag set of 0 indicates this is an announcement for the "first" // node in the channel. case flags&lnwire.ChanUpdateDirection == 0: @@ -2289,3 +2318,10 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, return false } + +// MarkEdgeLive clears an edge from our zombie index, deeming it as live. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error { + return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64()) +} diff --git a/routing/router_test.go b/routing/router_test.go index a8249183b..df9ad9e6a 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1549,21 +1549,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) { } // Check that the fundingTxs are in the graph db. - _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // Stop the router, so we can reorg the chain while its offline. if err := ctx.router.Stop(); err != nil { @@ -1607,22 +1613,27 @@ func TestWakeUpOnStaleBranch(t *testing.T) { // The channel with chanID2 should not be in the database anymore, // since it is not confirmed on the longest chain. chanID1 should // still be. - _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("did not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if has { t.Fatalf("found edge in graph") } - + if isZombie { + t.Fatal("reorged edge should not be marked as zombie") + } } // TestDisconnectedBlocks checks that the router handles a reorg happening when @@ -1755,21 +1766,27 @@ func TestDisconnectedBlocks(t *testing.T) { } // Check that the fundingTxs are in the graph db. - _, _, has, err := ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err := ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if !has { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // Create a 15 block fork. We first let the chainView notify the router // about stale blocks, before sending the now connected blocks. We do @@ -1796,22 +1813,27 @@ func TestDisconnectedBlocks(t *testing.T) { // chanID2 should not be in the database anymore, since it is not // confirmed on the longest chain. chanID1 should still be. - _, _, has, err = ctx.graph.HasChannelEdge(chanID1) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID1) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !has { t.Fatalf("did not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } - _, _, has, err = ctx.graph.HasChannelEdge(chanID2) + _, _, has, isZombie, err = ctx.graph.HasChannelEdge(chanID2) if err != nil { t.Fatalf("error looking for edge: %v", chanID2) } if has { t.Fatalf("found edge in graph") } - + if isZombie { + t.Fatal("reorged edge should not be marked as zombie") + } } // TestChansClosedOfflinePruneGraph tests that if channels we know of are @@ -1876,13 +1898,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) { } // The router should now be aware of the channel we created above. - _, _, hasChan, err := ctx.graph.HasChannelEdge(chanID1.ToUint64()) + _, _, hasChan, isZombie, err := ctx.graph.HasChannelEdge(chanID1.ToUint64()) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if !hasChan { t.Fatalf("could not find edge in graph") } + if isZombie { + t.Fatal("edge was marked as zombie") + } // With the transaction included, and the router's database state // updated, we'll now mine 5 additional blocks on top of it. @@ -1957,13 +1982,16 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) { // At this point, the channel that was pruned should no longer be known // by the router. - _, _, hasChan, err = ctx.graph.HasChannelEdge(chanID1.ToUint64()) + _, _, hasChan, isZombie, err = ctx.graph.HasChannelEdge(chanID1.ToUint64()) if err != nil { t.Fatalf("error looking for edge: %v", chanID1) } if hasChan { t.Fatalf("channel was found in graph but shouldn't have been") } + if isZombie { + t.Fatal("closed channel should not be marked as zombie") + } } // TestFindPathFeeWeighting tests that the findPath method will properly prefer diff --git a/server.go b/server.go index 0ac457332..6ffbe0711 100644 --- a/server.go +++ b/server.go @@ -583,7 +583,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, firstHop, htlcAdd, errorDecryptor, ) }, - ChannelPruneExpiry: time.Duration(time.Hour * 24 * 14), + ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { // If we aren't on either side of this edge, then we'll