txgraph: Add ability to trim oversized clusters (feature)

During reorganisations, it is possible that dependencies get add which
result in clusters that violate limits (count, size), when linking the
new from-block transactions to the old from-mempool transactions.

Unlike RBF scenarios, we cannot simply reject these policy violations
when they are due to received blocks. To accomodate this, add a Trim()
function to TxGraph, which removes transactions (including descendants)
in order to make all resulting clusters satisfy the limits.

In the initial version of the function added here, the following approach
is used:
- Lazily compute a naive linearization for the to-be-merged cluster (using
  an O(n log n) algorithm, optimized for far larger groups of transactions
  than the normal linearization code).
- Initialize a set of accepted transactions to {}
- Iterate over the transactions in this cluster one by one:
  - If adding the transaction to the set makes it exceed the max cluster size
    or count limit, stop.
  - Add the transaction to the set.
- Remove all transactions from the cluster that were not included in the set
  (note that this necessarily includes all descendants too, because they
  appear later in the naive linearization).

Co-authored-by: Greg Sanders <gsanders87@gmail.com>
This commit is contained in:
Pieter Wuille
2024-12-16 17:57:57 -05:00
parent eabcd0eb6f
commit a04e205ab0
3 changed files with 300 additions and 0 deletions

View File

@@ -247,6 +247,31 @@ struct SimTxGraph
}
}
}
/** Verify that set contains transactions from every oversized cluster, and nothing from
* non-oversized ones. */
bool MatchesOversizedClusters(const SetType& set)
{
if (set.Any() && !IsOversized()) return false;
auto todo = graph.Positions();
if (!set.IsSubsetOf(todo)) return false;
// Walk all clusters, and make sure all of set doesn't come from non-oversized clusters
while (todo.Any()) {
auto component = graph.FindConnectedComponent(todo);
// Determine whether component is oversized, due to either the size or count limit.
bool is_oversized = component.Count() > max_cluster_count;
uint64_t component_size{0};
for (auto i : component) component_size += graph.FeeRate(i).size;
is_oversized |= component_size > max_cluster_size;
// Check whether overlap with set matches is_oversized.
if (is_oversized != set.Overlaps(component)) return false;
todo -= component;
}
return true;
}
};
} // namespace
@@ -789,6 +814,30 @@ FUZZ_TARGET(txgraph)
assert(sum == worst_chunk_feerate);
}
break;
} else if ((block_builders.empty() || sims.size() > 1) && command-- == 0) {
// Trim.
bool was_oversized = top_sim.IsOversized();
auto removed = real->Trim();
// Verify that something was removed if and only if there was an oversized cluster.
assert(was_oversized == !removed.empty());
if (!was_oversized) break;
auto removed_set = top_sim.MakeSet(removed);
// The removed set must contain all its own descendants.
for (auto simpos : removed_set) {
assert(top_sim.graph.Descendants(simpos).IsSubsetOf(removed_set));
}
// Something from every oversized cluster should have been removed, and nothing
// else.
assert(top_sim.MatchesOversizedClusters(removed_set));
// Apply all removals to the simulation, and verify the result is no longer
// oversized. Don't query the real graph for oversizedness; it is compared
// against the simulation anyway later.
for (auto simpos : removed_set) {
top_sim.RemoveTransaction(top_sim.GetRef(simpos));
}
assert(!top_sim.IsOversized());
break;
}
}
}

View File

@@ -53,6 +53,27 @@ enum class QualityLevel
NONE,
};
/** Information about a transaction inside TxGraphImpl::Trim. */
struct TrimTxData
{
// Fields populated by Cluster::AppendTrimData(). These are immutable after TrimTxData
// construction.
/** Chunk feerate for this transaction. */
FeePerWeight m_chunk_feerate;
/** GraphIndex of the transaction. */
TxGraph::GraphIndex m_index;
/** Size of the transaction. */
uint32_t m_tx_size;
// Fields only used internally by TxGraphImpl::Trim():
/** Number of unmet dependencies this transaction has. -1 if the transaction is included. */
uint32_t m_deps_left;
/** Number of dependencies that apply to this transaction as parent. */
uint32_t m_children_count;
/** Where in deps those dependencies begin. */
uint32_t m_children_offset;
};
/** A grouping of connected transactions inside a TxGraphImpl::ClusterSet. */
class Cluster
{
@@ -152,6 +173,10 @@ public:
void Relinearize(TxGraphImpl& graph, uint64_t max_iters) noexcept;
/** For every chunk in the cluster, append its FeeFrac to ret. */
void AppendChunkFeerates(std::vector<FeeFrac>& ret) const noexcept;
/** Add a TrimTxData entry (filling m_chunk_feerate, m_index, m_tx_size) for every
* transaction in the Cluster to ret. Implicit dependencies between consecutive transactions
* in the linearization are added to deps. Return the Cluster's total transaction size. */
uint64_t AppendTrimData(std::vector<TrimTxData>& ret, std::vector<std::pair<GraphIndex, GraphIndex>>& deps) const noexcept;
// Functions that implement the Cluster-specific side of public TxGraph functions.
@@ -563,6 +588,7 @@ public:
std::strong_ordering CompareMainOrder(const Ref& a, const Ref& b) noexcept final;
GraphIndex CountDistinctClusters(std::span<const Ref* const> refs, bool main_only = false) noexcept final;
std::pair<std::vector<FeeFrac>, std::vector<FeeFrac>> GetMainStagingDiagrams() noexcept final;
std::vector<Ref*> Trim() noexcept final;
std::unique_ptr<BlockBuilder> GetBlockBuilder() noexcept final;
std::pair<std::vector<Ref*>, FeePerWeight> GetWorstMainChunk() noexcept final;
@@ -875,6 +901,37 @@ void Cluster::AppendChunkFeerates(std::vector<FeeFrac>& ret) const noexcept
ret.insert(ret.end(), chunk_feerates.begin(), chunk_feerates.end());
}
uint64_t Cluster::AppendTrimData(std::vector<TrimTxData>& ret, std::vector<std::pair<GraphIndex, GraphIndex>>& deps) const noexcept
{
const LinearizationChunking linchunking(m_depgraph, m_linearization);
LinearizationIndex pos{0};
uint64_t size{0};
auto prev_index = GraphIndex(-1);
// Iterate over the chunks of this cluster's linearization.
for (unsigned i = 0; i < linchunking.NumChunksLeft(); ++i) {
const auto& [chunk, chunk_feerate] = linchunking.GetChunk(i);
// Iterate over the transactions of that chunk, in linearization order.
auto chunk_tx_count = chunk.Count();
for (unsigned j = 0; j < chunk_tx_count; ++j) {
auto cluster_idx = m_linearization[pos];
// The transaction must appear in the chunk.
Assume(chunk[cluster_idx]);
// Construct a new element in ret.
auto& entry = ret.emplace_back();
entry.m_chunk_feerate = FeePerWeight::FromFeeFrac(chunk_feerate);
entry.m_index = m_mapping[cluster_idx];
// If this is not the first transaction of the cluster linearization, it has an
// implicit dependency on its predecessor.
if (pos != 0) deps.emplace_back(prev_index, entry.m_index);
prev_index = entry.m_index;
entry.m_tx_size = m_depgraph.FeeRate(cluster_idx).size;
size += entry.m_tx_size;
++pos;
}
}
return size;
}
bool Cluster::Split(TxGraphImpl& graph) noexcept
{
// This function can only be called when the Cluster needs splitting.
@@ -2525,6 +2582,195 @@ std::pair<std::vector<TxGraph::Ref*>, FeePerWeight> TxGraphImpl::GetWorstMainChu
return ret;
}
std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
{
int level = GetTopLevel();
Assume(m_main_chunkindex_observers == 0 || level != 0);
std::vector<TxGraph::Ref*> ret;
// Compute the groups of to-be-merged Clusters (which also applies all removals, and splits).
auto& clusterset = GetClusterSet(level);
if (clusterset.m_oversized == false) return ret;
GroupClusters(level);
Assume(clusterset.m_group_data.has_value());
// Nothing to do if not oversized.
Assume(clusterset.m_oversized.has_value());
if (clusterset.m_oversized == false) return ret;
// In this function, would-be clusters (as precomputed in m_group_data by GroupClusters) are
// trimmed by removing transactions in them such that the resulting clusters satisfy the size
// and count limits.
//
// It works by defining for each would-be cluster a rudimentary linearization: at every point
// the highest-chunk-feerate remaining transaction is picked among those with no unmet
// dependencies. "Dependency" here means either a to-be-added dependency (m_deps_to_add), or
// an implicit dependency added between any two consecutive transaction in their current
// cluster linearization. So it can be seen as a "merge sort" of the chunks of the clusters,
// but respecting the dependencies being added.
//
// This rudimentary linearization is computed lazily, by putting all eligible (no unmet
// dependencies) transactions in a heap, and popping the highest-feerate one from it. This
// continues as long as the number or size of all picked transactions together does not exceed
// the graph's configured cluster limits. All remaining transactions are then marked as
// removed.
//
// A next invocation of GroupClusters (after applying the removals) will compute the new
// resulting clusters, and none of them will violate the limits.
/** All dependencies (both to be added ones, and implicit ones between consecutive transactions
* in existing cluster linearizations). */
std::vector<std::pair<GraphIndex, GraphIndex>> deps;
/** Information about all transactions involved in a Cluster group to be trimmed, sorted by
* GraphIndex. It contains entries both for transactions that have already been included,
* and ones that have not yet been. */
std::vector<TrimTxData> trim_data;
/** Iterators into trim_data, treated as a max heap according to cmp_fn below. Each entry is
* a transaction that has not yet been included yet, but all its ancestors have. */
std::vector<std::vector<TrimTxData>::iterator> trim_heap;
/** Function to define the ordering of trim_heap. */
static constexpr auto cmp_fn = [](auto a, auto b) noexcept {
// Sort by increasing chunk feerate, and then by decreasing size.
// We do not need to sort by cluster or within clusters, because due to the implicit
// dependency between consecutive linearization elements, no two transactions from the
// same Cluster will ever simultaneously be in the heap.
return a->m_chunk_feerate < b->m_chunk_feerate;
};
/** Get iterator to TrimTxData entry for a given index. */
auto locate_fn = [&](GraphIndex index) noexcept {
auto it = std::lower_bound(trim_data.begin(), trim_data.end(), index, [](TrimTxData& elem, GraphIndex idx) noexcept {
return elem.m_index < idx;
});
Assume(it != trim_data.end() && it->m_index == index);
return it;
};
// For each group of to-be-merged Clusters.
for (const auto& group_data : clusterset.m_group_data->m_groups) {
trim_data.clear();
trim_heap.clear();
deps.clear();
// Gather trim data and implicit dependency data from all involved Clusters.
auto cluster_span = std::span{clusterset.m_group_data->m_group_clusters}
.subspan(group_data.m_cluster_offset, group_data.m_cluster_count);
uint64_t size{0};
for (Cluster* cluster : cluster_span) {
size += cluster->AppendTrimData(trim_data, deps);
}
// If this group of Clusters does not violate any limits, continue to the next group.
if (trim_data.size() <= m_max_cluster_count && size <= m_max_cluster_size) continue;
// Sort the trim data by GraphIndex. In what follows, we will treat this sorted vector as
// a map from GraphIndex to TrimTxData via locate_fn, and its ordering will not change
// anymore.
std::sort(trim_data.begin(), trim_data.end(), [](auto& a, auto& b) noexcept { return a.m_index < b.m_index; });
// Add the explicitly added dependencies to deps.
deps.insert(deps.end(),
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset,
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset + group_data.m_deps_count);
// Sort deps by child transaction GraphIndex.
std::sort(deps.begin(), deps.end(), [](auto& a, auto& b) noexcept { return a.second < b.second; });
// Fill m_deps_left in trim_data, and initially populate trim_heap. Because of the sort
// above, all dependencies involving the same child are grouped together, so a single
// linear scan suffices.
auto deps_it = deps.begin();
for (auto trim_it = trim_data.begin(); trim_it != trim_data.end(); ++trim_it) {
trim_it->m_deps_left = 0;
while (deps_it != deps.end() && deps_it->second == trim_it->m_index) {
++trim_it->m_deps_left;
++deps_it;
}
// If this transaction has no unmet dependencies, and is not oversized, add it to the
// heap (just append for now, the heapification happens below).
if (trim_it->m_deps_left == 0 && trim_it->m_tx_size <= m_max_cluster_size) {
trim_heap.push_back(trim_it);
}
}
Assume(deps_it == deps.end());
// Sort deps by parent transaction GraphIndex. The order will not be changed anymore after
// this.
std::sort(deps.begin(), deps.end(), [](auto& a, auto& b) noexcept { return a.first < b.first; });
// Fill m_children_offset and m_children_count in trim_data. Because of the sort above, all
// dependencies involving the same parent are grouped together, so a single linear scan
// suffices.
deps_it = deps.begin();
for (auto& trim_entry : trim_data) {
trim_entry.m_children_count = 0;
trim_entry.m_children_offset = deps_it - deps.begin();
while (deps_it != deps.end() && deps_it->first == trim_entry.m_index) {
++trim_entry.m_children_count;
++deps_it;
}
}
Assume(deps_it == deps.end());
// Build a heap of all transactions with 0 unmet dependencies.
std::make_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
// Iterate over to-be-included transactions, and convert them to included transactions, or
// decide to stop if doing so would violate resource limits.
//
// It is possible that the heap empties without ever hitting either cluster limit, in case
// the implied graph (to be added dependencies plus implicit dependency between each
// original transaction and its predecessor in the linearization it came from) contains
// cycles. Such cycles will be removed entirely, because each of the transactions in the
// cycle permanently have unmet dependencies. However, this cannot occur in real scenarios
// where Trim() is called to deal with reorganizations that would violate cluster limits,
// as all added dependencies are in the same direction (from old mempool transactions to
// new from-block transactions); cycles require dependencies in both directions to be
// added.
uint32_t total_count{0};
uint64_t total_size{0};
while (!trim_heap.empty()) {
// Move the best remaining transaction to the end of trim_heap.
std::pop_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
// Pop it, and find its TrimTxData.
auto& entry = *trim_heap.back();
trim_heap.pop_back();
// Compute resource counts.
total_count += 1;
total_size += entry.m_tx_size;
// Stop if this would violate any limit.
if (total_count > m_max_cluster_count || total_size > m_max_cluster_size) break;
// Mark the entry as included (so the loop below will not remove the transaction).
entry.m_deps_left = uint32_t(-1);
// Mark each to-be-added dependency involving this transaction as parent satisfied.
for (auto& [par, chl] : std::span{deps}.subspan(entry.m_children_offset, entry.m_children_count)) {
Assume(par == entry.m_index);
auto chl_it = locate_fn(chl);
// Reduce the number of unmet dependencies of chl_it, and if that brings the number
// to zero, add it to the heap of includable transactions.
Assume(chl_it->m_deps_left > 0);
if (--chl_it->m_deps_left == 0) {
trim_heap.push_back(chl_it);
std::push_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
}
}
}
// Remove all the transactions that were not processed above. Because nothing gets
// processed until/unless all its dependencies are met, this automatically guarantees
// that if a transaction is removed, all its descendants, or would-be descendants, are
// removed as well.
for (const auto& trim_entry : trim_data) {
if (trim_entry.m_deps_left != uint32_t(-1)) {
ret.push_back(m_entries[trim_entry.m_index].m_ref);
clusterset.m_to_remove.push_back(trim_entry.m_index);
}
}
}
clusterset.m_group_data.reset();
clusterset.m_oversized = false;
Assume(!ret.empty());
return ret;
}
} // namespace
TxGraph::Ref::~Ref()

View File

@@ -169,6 +169,11 @@ public:
* that appear identically in both. Use FeeFrac rather than FeePerWeight so CompareChunks is
* usable without type-conversion. */
virtual std::pair<std::vector<FeeFrac>, std::vector<FeeFrac>> GetMainStagingDiagrams() noexcept = 0;
/** Remove transactions (including their own descendants) according to a fast but best-effort
* strategy such that the TxGraph's cluster and size limits are respected. Applies to staging
* if it exists, and to main otherwise. Returns the list of all removed transactions in
* unspecified order. This has no effect unless the relevant graph is oversized. */
virtual std::vector<Ref*> Trim() noexcept = 0;
/** Interface returned by GetBlockBuilder. */
class BlockBuilder