htlcswitch+channel: add channel states synchronization

In this commit BOLT№2 retranmission logic for the channel link have
been added. Now if channel link have been initialised with the
'SyncState' field than it will send the lnwire.ChannelReestablish
message and will be waiting for receiving the same message from remote
side. Exchange of this message allow both sides understand which
updates they should exchange with each other in order sync their
states.
This commit is contained in:
Andrey Samokhvalov
2017-07-09 02:30:20 +03:00
committed by Olaoluwa Osuntokun
parent bea9c0b52b
commit d70ffe93e4
8 changed files with 1133 additions and 254 deletions

View File

@@ -140,6 +140,11 @@ type ChannelLinkConfig struct {
// the exit node.
// NOTE: HodlHTLC should be active in conjunction with DebugHTLC.
HodlHTLC bool
// SyncStates is used to indicate that we need send the channel
// reestablishment message to the remote peer. It should be done if our
// clients have been restarted, or remote peer have been reconnected.
SyncStates bool
}
// channelLink is the service which drives a channel's commitment update
@@ -260,8 +265,9 @@ var _ ChannelLink = (*channelLink)(nil)
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) Start() error {
if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
log.Warnf("channel link(%v): already started", l)
return nil
err := errors.Errorf("channel link(%v): already started", l)
log.Warn(err)
return err
}
log.Infof("ChannelLink(%v) is starting", l)
@@ -312,6 +318,29 @@ func (l *channelLink) htlcManager() {
log.Infof("HTLC manager for ChannelPoint(%v) started, "+
"bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// If the link have been recreated, than we need to sync the states by
// sending the channel reestablishment message.
if l.cfg.SyncStates {
log.Infof("Syncing states for channel(%v) via sending the "+
"re-establishment message", l.channel.ChannelPoint())
localCommitmentNumber, remoteRevocationNumber := l.channel.LastCounters()
l.cfg.Peer.SendMessage(&lnwire.ChannelReestablish{
ChanID: l.ChanID(),
NextLocalCommitmentNumber: localCommitmentNumber + 1,
NextRemoteRevocationNumber: remoteRevocationNumber + 1,
})
if err := l.channelInitialization(); err != nil {
err := errors.Errorf("unable to sync the states for channel(%v)"+
"with remote node: %v", l.ChanID(), err)
log.Error(err)
l.cfg.Peer.Disconnect(err)
return
}
}
// TODO(roasbeef): check to see if able to settle any currently pending
// HTLCs
// * also need signals when new invoices are added by the
@@ -469,6 +498,7 @@ out:
l.handleUpstreamMsg(msg)
case cmd := <-l.linkControl:
switch req := cmd.(type) {
case *policyUpdate:
// In order to avoid overriding a valid policy
@@ -681,6 +711,30 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// direct channel with, updating our respective commitment chains.
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
switch msg := msg.(type) {
case *lnwire.ChannelReestablish:
log.Infof("Received re-establishment message from remote side "+
"for channel(%v)", l.channel.ChannelPoint())
messagesToSyncState, err := l.channel.ReceiveReestablish(msg)
if err != nil {
err := errors.Errorf("unable to handle upstream reestablish "+
"message: %v", err)
log.Error(err)
l.cfg.Peer.Disconnect(err)
return
}
// Send message to the remote side which are needed to synchronize
// the state.
log.Infof("Sending %v updates to synchronize the "+
"state for channel(%v)", len(messagesToSyncState),
l.channel.ChannelPoint())
for _, msg := range messagesToSyncState {
l.cfg.Peer.SendMessage(msg)
}
return
case *lnwire.UpdateAddHTLC:
// We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our
@@ -774,7 +828,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
l.cancelReasons[idx] = msg.Reason
case *lnwire.CommitSig:
// We just received a new update to our local commitment chain,
// We just received a new updates to our local commitment chain,
// validate this new commitment, closing the link if invalid.
err := l.channel.ReceiveNewCommitment(msg.CommitSig, msg.HtlcSigs)
if err != nil {
@@ -1513,3 +1567,37 @@ func (l *channelLink) fail(format string, a ...interface{}) {
log.Error(reason)
l.cfg.Peer.Disconnect(reason)
}
// channelInitialization waits for channel synchronization message to
// be received from another side and handled.
func (l *channelLink) channelInitialization() error {
// Before we launch any of the helper goroutines off the channel link
// struct, we'll first ensure proper adherence to the p2p protocol. The
// channel reestablish message MUST be sent before any other message.
expired := time.After(time.Second * 5)
for {
select {
case msg := <-l.upstream:
if msg, ok := msg.(*lnwire.ChannelReestablish); ok {
l.handleUpstreamMsg(msg)
return nil
} else {
return errors.New("very first message between nodes " +
"for channel link should be reestablish message")
}
case pkt := <-l.downstream:
l.overflowQueue.consume(pkt)
case cmd := <-l.linkControl:
l.handleControlCommand(cmd)
// In order to avoid blocking indefinitely, we'll give the other peer
// an upper timeout of 5 seconds to respond before we bail out early.
case <-expired:
return errors.Errorf("peer did not complete handshake for channel " +
"link within 5 seconds")
}
}
}