diff --git a/src/net.cpp b/src/net.cpp index 4addca09822..eaa99e66017 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -867,20 +867,22 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept return true; } -Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept +Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_sending_header) { return {Span{m_header_to_send}.subspan(m_bytes_sent), - // We have more to send after the header if the message has payload. - !m_message_to_send.data.empty(), + // We have more to send after the header if the message has payload, or if there + // is a next message after that. + have_next_message || !m_message_to_send.data.empty(), m_message_to_send.m_type }; } else { return {Span{m_message_to_send.data}.subspan(m_bytes_sent), - // We never have more to send after this message's payload. - false, + // We only have more to send after this message's payload if there is another + // message. + have_next_message, m_message_to_send.m_type }; } @@ -916,6 +918,7 @@ std::pair CConnman::SocketSendData(CNode& node) const auto it = node.vSendMsg.begin(); size_t nSentSize = 0; bool data_left{false}; //!< second return value (whether unsent data remains) + std::optional expected_more; while (true) { if (it != node.vSendMsg.end()) { @@ -928,7 +931,12 @@ std::pair CConnman::SocketSendData(CNode& node) const ++it; } } - const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(); + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end()); + // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more + // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check, + // verify that the previously returned 'more' was correct. + if (expected_more.has_value()) Assume(!data.empty() == *expected_more); + expected_more = more; data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; if (!data.empty()) { @@ -941,9 +949,7 @@ std::pair CConnman::SocketSendData(CNode& node) const } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE - // We have more to send if either the transport itself has more, or if we have more - // messages to send. - if (more || it != node.vSendMsg.end()) { + if (more) { flags |= MSG_MORE; } #endif @@ -1323,9 +1329,10 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) { LOCK(pnode->cs_vSend); // Sending is possible if either there are bytes to send right now, or if there will be - // once a potential message from vSendMsg is handed to the transport. - const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); - select_send = !to_send.empty() || !pnode->vSendMsg.empty(); + // once a potential message from vSendMsg is handed to the transport. GetBytesToSend + // determines both of these in a single call. + const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty()); + select_send = !to_send.empty() || more; } if (!select_recv && !select_send) continue; @@ -3007,7 +3014,10 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend(); + // Check if the transport still has unsent bytes, and indicate to it that we're about to + // give it a message to send. + const auto& [to_send, more, _msg_type] = + pnode->m_transport->GetBytesToSend(/*have_next_message=*/true); const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; // Update memory usage of send buffer. @@ -3016,10 +3026,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) // Move message to vSendMsg queue. pnode->vSendMsg.push_back(std::move(msg)); - // If there was nothing to send before, attempt "optimistic write": + // If there was nothing to send before, and there is now (predicted by the "more" value + // returned by the GetBytesToSend call above), attempt "optimistic write": // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually // doing a send, try sending from the calling thread if the queue was empty before. - if (queue_was_empty) { + // With a V1Transport, more will always be true here, because adding a message always + // results in sendable bytes there. + if (queue_was_empty && more) { std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); } } diff --git a/src/net.h b/src/net.h index 60a15fea556..1507ff7384e 100644 --- a/src/net.h +++ b/src/net.h @@ -308,19 +308,40 @@ public: const std::string& /*m_type*/ >; - /** Get bytes to send on the wire. + /** Get bytes to send on the wire, if any, along with other information about it. * * As a const function, it does not modify the transport's observable state, and is thus safe * to be called multiple times. * - * The bytes returned by this function act as a stream which can only be appended to. This - * means that with the exception of MarkBytesSent, operations on the transport can only append - * to what is being returned. + * @param[in] have_next_message If true, the "more" return value reports whether more will + * be sendable after a SetMessageToSend call. It is set by the caller when they know + * they have another message ready to send, and only care about what happens + * after that. The have_next_message argument only affects this "more" return value + * and nothing else. * - * Note that m_type and to_send refer to data that is internal to the transport, and calling - * any non-const function on this object may invalidate them. + * Effectively, there are three possible outcomes about whether there are more bytes + * to send: + * - Yes: the transport itself has more bytes to send later. For example, for + * V1Transport this happens during the sending of the header of a + * message, when there is a non-empty payload that follows. + * - No: the transport itself has no more bytes to send, but will have bytes to + * send if handed a message through SetMessageToSend. In V1Transport this + * happens when sending the payload of a message. + * - Blocked: the transport itself has no more bytes to send, and is also incapable + * of sending anything more at all now, if it were handed another + * message to send. + * + * The boolean 'more' is true for Yes, false for Blocked, and have_next_message + * controls what is returned for No. + * + * @return a BytesToSend object. The to_send member returned acts as a stream which is only + * ever appended to. This means that with the exception of MarkBytesSent (which pops + * bytes off the front of later to_sends), operations on the transport can only append + * to what is being returned. Also note that m_type and to_send refer to data that is + * internal to the transport, and calling any non-const function on this object may + * invalidate them. */ - virtual BytesToSend GetBytesToSend() const noexcept = 0; + virtual BytesToSend GetBytesToSend(bool have_next_message) const noexcept = 0; /** Report how many bytes returned by the last GetBytesToSend() have been sent. * @@ -416,7 +437,7 @@ public: CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex); bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); - BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); + BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex); }; diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 7f5d587cf6f..8c1182b5e13 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) { LOCK(dummyNode1.cs_vSend); - const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false); BOOST_CHECK(!to_send.empty()); } connman.FlushSendBuffer(dummyNode1); @@ -97,7 +97,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); - const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(); + const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false); BOOST_CHECK(!to_send.empty()); } // Wait 3 more minutes diff --git a/src/test/fuzz/p2p_transport_serialization.cpp b/src/test/fuzz/p2p_transport_serialization.cpp index 2fa5de50088..468bb789ed9 100644 --- a/src/test/fuzz/p2p_transport_serialization.cpp +++ b/src/test/fuzz/p2p_transport_serialization.cpp @@ -92,7 +92,7 @@ FUZZ_TARGET(p2p_transport_serialization, .init = initialize_p2p_transport_serial assert(queued); std::optional known_more; while (true) { - const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend(); + const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend(false); if (known_more) assert(!to_send.empty() == *known_more); if (to_send.empty()) break; send_transport.MarkBytesSent(to_send.size()); @@ -124,11 +124,13 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa // Vectors with bytes last returned by GetBytesToSend() on transport[i]. std::array, 2> to_send; - // Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend(). - std::array, 2> last_more; + // Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend(), for + // both have_next_message false and true. + std::array, 2> last_more, last_more_next; - // Whether more bytes to be sent are expected on transport[i]. - std::array, 2> expect_more; + // Whether more bytes to be sent are expected on transport[i], before and after + // SetMessageToSend(). + std::array, 2> expect_more, expect_more_next; // Function to consume a message type. auto msg_type_fn = [&]() { @@ -177,18 +179,27 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa // Wrapper around transport[i]->GetBytesToSend() that performs sanity checks. auto bytes_to_send_fn = [&](int side) -> Transport::BytesToSend { - const auto& [bytes, more, msg_type] = transports[side]->GetBytesToSend(); + // Invoke GetBytesToSend twice (for have_next_message = {false, true}). This function does + // not modify state (it's const), and only the "more" return value should differ between + // the calls. + const auto& [bytes, more_nonext, msg_type] = transports[side]->GetBytesToSend(false); + const auto& [bytes_next, more_next, msg_type_next] = transports[side]->GetBytesToSend(true); // Compare with expected more. if (expect_more[side].has_value()) assert(!bytes.empty() == *expect_more[side]); + // Verify consistency between the two results. + assert(bytes == bytes_next); + assert(msg_type == msg_type_next); + if (more_nonext) assert(more_next); // Compare with previously reported output. assert(to_send[side].size() <= bytes.size()); assert(to_send[side] == Span{bytes}.first(to_send[side].size())); to_send[side].resize(bytes.size()); std::copy(bytes.begin(), bytes.end(), to_send[side].begin()); - // Remember 'more' result. - last_more[side] = {more}; + // Remember 'more' results. + last_more[side] = {more_nonext}; + last_more_next[side] = {more_next}; // Return. - return {bytes, more, msg_type}; + return {bytes, more_nonext, msg_type}; }; // Function to make side send a new message. @@ -199,7 +210,8 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa CSerializedNetMsg msg = next_msg[side].Copy(); bool queued = transports[side]->SetMessageToSend(msg); // Update expected more data. - expect_more[side] = std::nullopt; + expect_more[side] = expect_more_next[side]; + expect_more_next[side] = std::nullopt; // Verify consistency of GetBytesToSend after SetMessageToSend bytes_to_send_fn(/*side=*/side); if (queued) { @@ -223,6 +235,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa // If all to-be-sent bytes were sent, move last_more data to expect_more data. if (send_now == bytes.size()) { expect_more[side] = last_more[side]; + expect_more_next[side] = last_more_next[side]; } // Remove the bytes from the last reported to-be-sent vector. assert(to_send[side].size() >= send_now); @@ -251,6 +264,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa // Clear cached expected 'more' information: if certainly no more data was to be sent // before, receiving bytes makes this uncertain. if (expect_more[!side] == false) expect_more[!side] = std::nullopt; + if (expect_more_next[!side] == false) expect_more_next[!side] = std::nullopt; // Verify consistency of GetBytesToSend after ReceivedBytes bytes_to_send_fn(/*side=*/!side); bool progress = to_recv.size() < old_len; diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 5696f8d13c2..dc64c0b4c1b 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -78,7 +78,7 @@ void ConnmanTestMsg::FlushSendBuffer(CNode& node) const node.vSendMsg.clear(); node.m_send_memusage = 0; while (true) { - const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(); + const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false); if (to_send.empty()) break; node.m_transport->MarkBytesSent(to_send.size()); } @@ -90,7 +90,7 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co assert(queued); bool complete{false}; while (true) { - const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(); + const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false); if (to_send.empty()) break; NodeReceiveMsgBytes(node, to_send, complete); node.m_transport->MarkBytesSent(to_send.size());