diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9d9e31d3ed8..ac5300990fb 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -817,7 +817,12 @@ void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const std::set unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); for (const uint256& txid : unbroadcast_txids) { - RelayTransaction(txid, *connman); + // Sanity check: all unbroadcast txns should exist in the mempool + if (m_mempool.exists(txid)) { + RelayTransaction(txid, *connman); + } else { + m_mempool.RemoveUnbroadcastTx(txid, true); + } } // schedule next run for 10-15 minutes in the future diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 3c5fd565e3e..ac567e16ce6 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -408,6 +408,7 @@ static std::vector MempoolEntryDescription() { return { RPCResult{RPCResult::Type::ARR, "spentby", "unconfirmed transactions spending outputs from this transaction", {RPCResult{RPCResult::Type::STR_HEX, "transactionid", "child transaction id"}}}, RPCResult{RPCResult::Type::BOOL, "bip125-replaceable", "Whether this transaction could be replaced due to BIP125 (replace-by-fee)"}, + RPCResult{RPCResult::Type::BOOL, "unbroadcast", "Whether this transaction is currently unbroadcast (initial broadcast not yet confirmed)"}, };} static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPoolEntry& e) EXCLUSIVE_LOCKS_REQUIRED(pool.cs) @@ -469,6 +470,7 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool } info.pushKV("bip125-replaceable", rbfStatus); + info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); } UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose) @@ -1398,7 +1400,7 @@ UniValue MempoolInfoToJSON(const CTxMemPool& pool) ret.pushKV("maxmempool", (int64_t) maxmempool); ret.pushKV("mempoolminfee", ValueFromAmount(std::max(pool.GetMinFee(maxmempool), ::minRelayTxFee).GetFeePerK())); ret.pushKV("minrelaytxfee", ValueFromAmount(::minRelayTxFee.GetFeePerK())); - + ret.pushKV("unbroadcastcount", uint64_t{pool.GetUnbroadcastTxs().size()}); return ret; } @@ -1417,6 +1419,7 @@ static UniValue getmempoolinfo(const JSONRPCRequest& request) {RPCResult::Type::NUM, "maxmempool", "Maximum memory usage for the mempool"}, {RPCResult::Type::STR_AMOUNT, "mempoolminfee", "Minimum fee rate in " + CURRENCY_UNIT + "/kB for tx to be accepted. Is the maximum of minrelaytxfee and minimum mempool fee"}, {RPCResult::Type::STR_AMOUNT, "minrelaytxfee", "Current minimum relay fee for transactions"}, + {RPCResult::Type::NUM, "unbroadcastcount", "Current number of transactions that haven't passed initial broadcast yet"} }}, RPCExamples{ HelpExampleCli("getmempoolinfo", "") diff --git a/src/txmempool.h b/src/txmempool.h index 4bee78b8d67..4568eb928d9 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -704,7 +704,10 @@ public: /** Adds a transaction to the unbroadcast set */ void AddUnbroadcastTx(const uint256& txid) { LOCK(cs); - m_unbroadcast_txids.insert(txid); + /** Sanity Check: the transaction should also be in the mempool */ + if (exists(txid)) { + m_unbroadcast_txids.insert(txid); + } } /** Removes a transaction from the unbroadcast set */ @@ -716,6 +719,12 @@ public: return m_unbroadcast_txids; } + // Returns if a txid is in the unbroadcast set + bool IsUnbroadcastTx(const uint256& txid) const { + LOCK(cs); + return (m_unbroadcast_txids.count(txid) != 0); + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 2b45c6a536e..e1e8263ee1f 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1982,10 +1982,6 @@ void CWallet::ResendWalletTransactions() nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60); if (fFirst) return; - // Only do it if there's been a new block since last time - if (m_best_block_time < nLastResend) return; - nLastResend = GetTime(); - int submitted_tx_count = 0; { // cs_wallet scope diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index a29fa222079..9d312e8ee5d 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -641,7 +641,6 @@ private: int nWalletMaxVersion GUARDED_BY(cs_wallet) = FEATURE_BASE; int64_t nNextResend = 0; - int64_t nLastResend = 0; bool fBroadcastTransactions = false; // Local time that the tip block was received. Used to schedule wallet rebroadcasts. std::atomic m_best_block_time {0}; diff --git a/test/functional/mempool_packages.py b/test/functional/mempool_packages.py index a07dad18d61..5b7216b2539 100755 --- a/test/functional/mempool_packages.py +++ b/test/functional/mempool_packages.py @@ -7,6 +7,7 @@ from decimal import Decimal from test_framework.messages import COIN +from test_framework.mininode import P2PTxInvStore from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, @@ -58,6 +59,7 @@ class MempoolPackagesTest(BitcoinTestFramework): def run_test(self): # Mine some blocks and have them mature. + self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs self.nodes[0].generate(101) utxo = self.nodes[0].listunspent(10) txid = utxo[0]['txid'] @@ -72,6 +74,10 @@ class MempoolPackagesTest(BitcoinTestFramework): value = sent_value chain.append(txid) + # Wait until mempool transactions have passed initial broadcast (sent inv and received getdata) + # Otherwise, getrawmempool may be inconsistent with getmempoolentry if unbroadcast changes in between + self.nodes[0].p2p.wait_for_broadcast(chain) + # Check mempool has MAX_ANCESTORS transactions in it, and descendant and ancestor # count and fees should look correct mempool = self.nodes[0].getrawmempool(True) @@ -212,6 +218,10 @@ class MempoolPackagesTest(BitcoinTestFramework): for tx in chain[:MAX_ANCESTORS_CUSTOM]: assert tx in mempool1 # TODO: more detailed check of node1's mempool (fees etc.) + # check transaction unbroadcast info (should be false if in both mempools) + mempool = self.nodes[0].getrawmempool(True) + for tx in mempool: + assert_equal(mempool[tx]['unbroadcast'], False) # TODO: test ancestor size limits diff --git a/test/functional/mempool_unbroadcast.py b/test/functional/mempool_unbroadcast.py index a561f28b917..dedf5b8a476 100755 --- a/test/functional/mempool_unbroadcast.py +++ b/test/functional/mempool_unbroadcast.py @@ -53,6 +53,13 @@ class MempoolUnbroadcastTest(BitcoinTestFramework): txFS = node.signrawtransactionwithwallet(txF["hex"]) rpc_tx_hsh = node.sendrawtransaction(txFS["hex"]) + # check transactions are in unbroadcast using rpc + mempoolinfo = self.nodes[0].getmempoolinfo() + assert_equal(mempoolinfo['unbroadcastcount'], 2) + mempool = self.nodes[0].getrawmempool(True) + for tx in mempool: + assert_equal(mempool[tx]['unbroadcast'], True) + # check that second node doesn't have these two txns mempool = self.nodes[1].getrawmempool() assert rpc_tx_hsh not in mempool @@ -71,6 +78,11 @@ class MempoolUnbroadcastTest(BitcoinTestFramework): assert rpc_tx_hsh in mempool assert wallet_tx_hsh in mempool + # check that transactions are no longer in first node's unbroadcast set + mempool = self.nodes[0].getrawmempool(True) + for tx in mempool: + assert_equal(mempool[tx]['unbroadcast'], False) + self.log.info("Add another connection & ensure transactions aren't broadcast again") conn = node.add_p2p_connection(P2PTxInvStore()) diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index bbd7350bf15..95a63717d60 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -645,6 +645,7 @@ class P2PTxInvStore(P2PInterface): self.tx_invs_received = defaultdict(int) def on_inv(self, message): + super().on_inv(message) # Send getdata in response. # Store how many times invs have been received for each tx. for i in message.inv: if i.type == MSG_TX: @@ -654,3 +655,12 @@ class P2PTxInvStore(P2PInterface): def get_invs(self): with mininode_lock: return list(self.tx_invs_received.keys()) + + def wait_for_broadcast(self, txns, timeout=60): + """Waits for the txns (list of txids) to complete initial broadcast. + The mempool should mark unbroadcast=False for these transactions. + """ + # Wait until invs have been received (and getdatas sent) for each txid. + self.wait_until(lambda: set(self.get_invs()) == set([int(tx, 16) for tx in txns]), timeout) + # Flush messages and wait for the getdatas to be processed + self.sync_with_ping()