net: abstract sending side of transport serialization further

This makes the sending side of P2P transports mirror the receiver side: caller provides
message (consisting of type and payload) to be sent, and then asks what bytes must be
sent. Once the message has been fully sent, a new message can be provided.

This removes the assumption that P2P serialization of messages follows a strict structure
of header (a function of type and payload), followed by (unmodified) payload, and instead
lets transports decide the structure themselves.

It also removes the assumption that a message must always be sent at once, or that no
bytes are even sent on the wire when there is no message. This opens the door for
supporting traffic shaping mechanisms in the future.
This commit is contained in:
Pieter Wuille
2023-07-21 16:31:59 -04:00
parent 649a83c7f7
commit 0de48fe858
6 changed files with 161 additions and 36 deletions

View File

@@ -824,8 +824,13 @@ CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time
return msg;
}
void V1Transport::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
{
AssertLockNotHeld(m_send_mutex);
// Determine whether a new message can be set.
LOCK(m_send_mutex);
if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
// create dbl-sha256 checksum
uint256 hash = Hash(msg.data);
@@ -834,8 +839,50 @@ void V1Transport::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsign
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
// serialize header
header.reserve(CMessageHeader::HEADER_SIZE);
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
m_header_to_send.clear();
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
// update state
m_message_to_send = std::move(msg);
m_sending_header = true;
m_bytes_sent = 0;
return true;
}
Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
{
AssertLockNotHeld(m_send_mutex);
LOCK(m_send_mutex);
if (m_sending_header) {
return {Span{m_header_to_send}.subspan(m_bytes_sent),
// We have more to send after the header if the message has payload.
!m_message_to_send.data.empty(),
m_message_to_send.m_type
};
} else {
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
// We never have more to send after this message's payload.
false,
m_message_to_send.m_type
};
}
}
void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
{
AssertLockNotHeld(m_send_mutex);
LOCK(m_send_mutex);
m_bytes_sent += bytes_sent;
if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
// We're done sending a message's header. Switch to sending its data bytes.
m_sending_header = false;
m_bytes_sent = 0;
} else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
// We're done sending a message's data. Wipe the data vector to reduce memory consumption.
m_message_to_send.data.clear();
m_message_to_send.data.shrink_to_fit();
m_bytes_sent = 0;
}
}
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
@@ -2910,27 +2957,40 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
msg.data.data()
);
// make sure we use the appropriate network transport format
std::vector<unsigned char> serializedHeader;
pnode->m_transport->prepareForTransport(msg, serializedHeader);
size_t nTotalSize = nMessageSize + serializedHeader.size();
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
bool optimisticSend(pnode->vSendMsg.empty());
const bool queue_was_empty{pnode->vSendMsg.empty()};
//log total amount of bytes per message type
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
pnode->nSendSize += nTotalSize;
// Give the message to the transport, and add all bytes it wants us to send out as byte
// vectors to vSendMsg. This is temporary code that exists to support the new transport
// sending interface using the old way of queueing data. In a future commit vSendMsg will
// be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
// will disappear.
bool queued = pnode->m_transport->SetMessageToSend(msg);
assert(queued);
// In the current transport (V1Transport), GetBytesToSend first returns a header to send,
// and then the payload data (if any), necessitating a loop.
while (true) {
const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
if (bytes.empty()) break;
// Update statistics per message type.
pnode->AccountForSentBytes(msg_type, bytes.size());
// Update number of bytes in the send buffer.
pnode->nSendSize += bytes.size();
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
// Notify transport that bytes have been processed (they're not actually sent yet,
// but pushed onto the vSendMsg queue of bytes to send).
pnode->m_transport->MarkBytesSent(bytes.size());
}
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
// If write queue empty, attempt "optimistic write"
bool data_left;
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
// If the write queue was empty before and isn't now, attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
if (queue_was_empty && !pnode->vSendMsg.empty()) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}