P2P: parse network datastream into header/data components in socket thread

Replaces CNode::vRecv buffer with a vector of CNetMessage's.  This simplifies
ProcessMessages() and eliminates several redundant data copies.

Overview:

* socket thread now parses incoming message datastream into
  header/data components, as encapsulated by CNetMessage
* socket thread adds each CNetMessage to a vector inside CNode
* message thread (ProcessMessages) iterates through CNode's CNetMessage vector

Message parsing is made more strict:

* Socket is disconnected, if message larger than MAX_SIZE
  or if CMessageHeader deserialization fails (latter is impossible?).
  Previously, code would simply eat garbage data all day long.
* Socket is disconnected, if we fail to find pchMessageStart.
  We do not search through garbage, to find pchMessageStart.  Each
  message must begin precisely after the last message ends.

ProcessMessages() always processes a complete message, and is more efficient:

* buffer is always precisely sized, using CDataStream::resize(),
  rather than progressively sized in 64k chunks.  More efficient
  for large messages like "block".
* whole-buffer memory copy eliminated (vRecv -> vMsg)
* other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase)
This commit is contained in:
Jeff Garzik
2012-11-15 19:41:12 -05:00
committed by Pieter Wuille
parent ea83336f4e
commit 607dbfdeaf
3 changed files with 176 additions and 55 deletions

View File

@@ -3168,7 +3168,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
else if (strCommand == "verack")
{
pfrom->vRecv.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
}
@@ -3705,13 +3705,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}
// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom)
{
CDataStream& vRecv = pfrom->vRecv;
if (vRecv.empty())
if (pfrom->vRecvMsg.empty())
return true;
//if (fDebug)
// printf("ProcessMessages(%u bytes)\n", vRecv.size());
// printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size());
//
// Message format
@@ -3722,32 +3722,32 @@ bool ProcessMessages(CNode* pfrom)
// (x) data
//
loop
unsigned int nMsgPos = 0;
for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++)
{
// Don't bother if send buffer is too full to respond anyway
if (pfrom->vSend.size() >= SendBufferSize())
break;
// Scan for message start
CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart));
int nHeaderSize = vRecv.GetSerializeSize(CMessageHeader());
if (vRecv.end() - pstart < nHeaderSize)
{
if ((int)vRecv.size() > nHeaderSize)
{
printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n");
vRecv.erase(vRecv.begin(), vRecv.end() - nHeaderSize);
}
// get next message; end, if an incomplete message is found
CNetMessage& msg = pfrom->vRecvMsg[nMsgPos];
//if (fDebug)
// printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n",
// msg.hdr.nMessageSize, msg.vRecv.size(),
// msg.complete() ? "Y" : "N");
if (!msg.complete())
break;
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) {
printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n");
return false;
}
if (pstart - vRecv.begin() > 0)
printf("\n\nPROCESSMESSAGE SKIPPED %"PRIpdd" BYTES\n\n", pstart - vRecv.begin());
vRecv.erase(vRecv.begin(), pstart);
// Read header
vector<char> vHeaderSave(vRecv.begin(), vRecv.begin() + nHeaderSize);
CMessageHeader hdr;
vRecv >> hdr;
CMessageHeader& hdr = msg.hdr;
if (!hdr.IsValid())
{
printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str());
@@ -3757,19 +3757,9 @@ bool ProcessMessages(CNode* pfrom)
// Message size
unsigned int nMessageSize = hdr.nMessageSize;
if (nMessageSize > MAX_SIZE)
{
printf("ProcessMessages(%s, %u bytes) : nMessageSize > MAX_SIZE\n", strCommand.c_str(), nMessageSize);
continue;
}
if (nMessageSize > vRecv.size())
{
// Rewind and wait for rest of message
vRecv.insert(vRecv.begin(), vHeaderSave.begin(), vHeaderSave.end());
break;
}
// Checksum
CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
@@ -3780,17 +3770,13 @@ bool ProcessMessages(CNode* pfrom)
continue;
}
// Copy message to its own buffer
CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion);
vRecv.ignore(nMessageSize);
// Process message
bool fRet = false;
try
{
{
LOCK(cs_main);
fRet = ProcessMessage(pfrom, strCommand, vMsg);
fRet = ProcessMessage(pfrom, strCommand, vRecv);
}
if (fShutdown)
return true;
@@ -3822,7 +3808,10 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
}
vRecv.Compact();
// remove processed messages; one incomplete message may remain
if (nMsgPos > 0)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(),
pfrom->vRecvMsg.begin() + nMsgPos);
return true;
}