net: add have_next_message argument to Transport::GetBytesToSend()

Before this commit, there are only two possibly outcomes for the "more" prediction
in Transport::GetBytesToSend():
* true: the transport itself has more to send, so the answer is certainly yes.
* false: the transport has nothing further to send, but if vSendMsg has more message(s)
         left, that still will result in more wire bytes after the next
         SetMessageToSend().

For the BIP324 v2 transport, there will arguably be a third state:
* definitely not: the transport has nothing further to send, but even if vSendMsg has
                  more messages left, they can't be sent (right now). This happens
                  before the handshake is complete.

To implement this, we move the entire decision logic to the Transport, by adding a
boolean to GetBytesToSend(), called have_next_message, which informs the transport
whether more messages are available. The return values are still true and false, but
they mean "definitely yes" and "definitely no", rather than "yes" and "maybe".
This commit is contained in:
Pieter Wuille
2023-08-16 13:21:35 -04:00
parent 8e0d9796da
commit c3fad1f29d
5 changed files with 85 additions and 37 deletions

View File

@@ -867,20 +867,22 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
return true;
}
Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept
{
AssertLockNotHeld(m_send_mutex);
LOCK(m_send_mutex);
if (m_sending_header) {
return {Span{m_header_to_send}.subspan(m_bytes_sent),
// We have more to send after the header if the message has payload.
!m_message_to_send.data.empty(),
// We have more to send after the header if the message has payload, or if there
// is a next message after that.
have_next_message || !m_message_to_send.data.empty(),
m_message_to_send.m_type
};
} else {
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
// We never have more to send after this message's payload.
false,
// We only have more to send after this message's payload if there is another
// message.
have_next_message,
m_message_to_send.m_type
};
}
@@ -916,6 +918,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
bool data_left{false}; //!< second return value (whether unsent data remains)
std::optional<bool> expected_more;
while (true) {
if (it != node.vSendMsg.end()) {
@@ -928,7 +931,12 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
++it;
}
}
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end());
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
// verify that the previously returned 'more' was correct.
if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
expected_more = more;
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
if (!data.empty()) {
@@ -941,9 +949,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
// We have more to send if either the transport itself has more, or if we have more
// messages to send.
if (more || it != node.vSendMsg.end()) {
if (more) {
flags |= MSG_MORE;
}
#endif
@@ -1323,9 +1329,10 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
{
LOCK(pnode->cs_vSend);
// Sending is possible if either there are bytes to send right now, or if there will be
// once a potential message from vSendMsg is handed to the transport.
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
// once a potential message from vSendMsg is handed to the transport. GetBytesToSend
// determines both of these in a single call.
const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty());
select_send = !to_send.empty() || more;
}
if (!select_recv && !select_send) continue;
@@ -3007,7 +3014,10 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
// Check if the transport still has unsent bytes, and indicate to it that we're about to
// give it a message to send.
const auto& [to_send, more, _msg_type] =
pnode->m_transport->GetBytesToSend(/*have_next_message=*/true);
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
// Update memory usage of send buffer.
@@ -3016,10 +3026,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
// Move message to vSendMsg queue.
pnode->vSendMsg.push_back(std::move(msg));
// If there was nothing to send before, attempt "optimistic write":
// If there was nothing to send before, and there is now (predicted by the "more" value
// returned by the GetBytesToSend call above), attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
if (queue_was_empty) {
// With a V1Transport, more will always be true here, because adding a message always
// results in sendable bytes there.
if (queue_was_empty && more) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}