connman is in charge of pushing messages

The changes here are dense and subtle, but hopefully all is more explicit
than before.

- CConnman is now in charge of sending data rather than the nodes themselves.
  This is necessary because many decisions need to be made with all nodes in
  mind, and a model that requires the nodes calling up to their manager quickly
  turns to spaghetti.

- The per-node-serializer (ssSend) has been replaced with a (quasi-)const
  send-version. Since the send version for serialization can only change once
  per connection, we now explicitly tag messages with INIT_PROTO_VERSION if
  they are sent before the handshake. With this done, there's no need to lock
  for access to nSendVersion.

  Also, a new stream is used for each message, so there's no need to lock
  during the serialization process.

- This takes care of accounting for optimistic sends, so the
  nOptimisticBytesWritten hack can be removed.

- -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect
  they haven't been used in years.
This commit is contained in:
Cory Fields
2016-09-12 20:00:33 -04:00
committed by Pieter Wuille
parent b98c14c4e3
commit 3e32cd09f6
4 changed files with 135 additions and 32 deletions

View File

@@ -394,6 +394,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
PushVersion(pnode, GetTime());
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
pnode->AddRef();
@@ -415,6 +418,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
return NULL;
}
void CConnman::PushVersion(CNode* pnode, int64_t nTime)
{
ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices));
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
uint64_t nonce = pnode->GetLocalNonce();
int nNodeStartingHeight = pnode->nMyStartingHeight;
NodeId id = pnode->GetId();
PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
if (fLogIPs)
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
else
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id);
}
void CConnman::DumpBanlist()
{
SweepBanned(); // clean unused entries (if bantime has expired)
@@ -450,23 +471,6 @@ void CNode::CloseSocketDisconnect()
vRecvMsg.clear();
}
void CNode::PushVersion()
{
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
CAddress addrMe = CAddress(CService(), nLocalServices);
if (fLogIPs)
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
else
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
}
void CConnman::ClearBanned()
{
{
@@ -2530,7 +2534,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
filterInventoryKnown(50000, 0.000001),
nLocalHostNonce(nLocalHostNonceIn),
nLocalServices(nLocalServicesIn),
nMyStartingHeight(nMyStartingHeightIn)
nMyStartingHeight(nMyStartingHeightIn),
nSendVersion(0)
{
nServices = NODE_NONE;
nServicesExpected = NODE_NONE;
@@ -2587,10 +2592,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
else
LogPrint("net", "Added connection peer=%d\n", id);
// Be shy and don't send version until we hear
if (hSocket != INVALID_SOCKET && !fInbound)
PushVersion();
}
CNode::~CNode()
@@ -2696,6 +2697,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
LEAVE_CRITICAL_SECTION(cs_vSend);
}
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
{
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
}
void CConnman::EndMessage(CDataStream& strm)
{
// Set the size
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
// Set the checksum
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
}
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
{
if(strm.empty())
return;
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
if(pnode->hSocket == INVALID_SOCKET) {
return;
}
bool optimisticSend(pnode->vSendMsg.empty());
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
pnode->nSendSize += strm.size();
// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)
nBytesSent = SocketSendData(pnode);
}
if (nBytesSent)
RecordBytesSent(nBytesSent);
}
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;