net: add a flag to indicate when a node's send buffer is full

Similar to the recv flag, but this one indicates whether or not the net's send
buffer is full.

The socket handler checks the send queue when a new message is added and pauses
if necessary, and possibly unpauses after each message is drained from its buffer.
This commit is contained in:
Cory Fields
2016-12-31 02:05:32 -05:00
parent c6e8a9bcff
commit 991955ee81
3 changed files with 12 additions and 8 deletions

View File

@@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode)
size_t CConnman::SocketSendData(CNode *pnode)
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
size_t nBytes = SocketSendData(pnode);
if (nBytes)
if (nBytes) {
RecordBytesSent(nBytes);
}
}
}
@@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler()
if (lockRecv)
{
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
}
}
if (flagInterruptMsgProc)
@@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
fPauseRecv = false;
fPauseSend = false;
nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
@@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
if (pnode->nSendSize > nSendBufferMaxSize)
pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));