sqlite: Ensure that only one SQLiteBatch is writing to db at a time

A SQLiteBatch need to wait for any other batch to finish writing before
it can begin writing, otherwise db txn state may be incorrectly
modified. To enforce this, each SQLiteDatabase has a semaphore which
acts as a lock and is acquired by a batch when it begins a write, erase,
or a transaction, and is released by it when it is done.

To avoid deadlocking on itself for writing during a transaction,
SQLiteBatch also keeps track of whether it has begun a transaction.
This commit is contained in:
Ava Chow
2023-12-18 17:06:04 -05:00
parent 6f7395b3ff
commit 395bcd2454
2 changed files with 48 additions and 5 deletions

View File

@@ -110,7 +110,7 @@ Mutex SQLiteDatabase::g_sqlite_mutex;
int SQLiteDatabase::g_sqlite_count = 0;
SQLiteDatabase::SQLiteDatabase(const fs::path& dir_path, const fs::path& file_path, const DatabaseOptions& options, bool mock)
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_use_unsafe_sync(options.use_unsafe_sync)
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_write_semaphore(1), m_use_unsafe_sync(options.use_unsafe_sync)
{
{
LOCK(g_sqlite_mutex);
@@ -408,7 +408,7 @@ void SQLiteBatch::Close()
bool force_conn_refresh = false;
// If we began a transaction, and it wasn't committed, abort the transaction in progress
if (m_database.HasActiveTxn()) {
if (m_txn) {
if (TxnAbort()) {
LogPrintf("SQLiteBatch: Batch closed unexpectedly without the transaction being explicitly committed or aborted\n");
} else {
@@ -442,6 +442,8 @@ void SQLiteBatch::Close()
m_database.Close();
try {
m_database.Open();
// If TxnAbort failed and we refreshed the connection, the semaphore was not released, so release it here to avoid deadlocks on future writes.
m_database.m_write_semaphore.post();
} catch (const std::runtime_error&) {
// If open fails, cleanup this object and rethrow the exception
m_database.Close();
@@ -493,6 +495,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
if (!BindBlobToStatement(stmt, 1, key, "key")) return false;
if (!BindBlobToStatement(stmt, 2, value, "value")) return false;
// Acquire semaphore if not previously acquired when creating a transaction.
if (!m_txn) m_database.m_write_semaphore.wait();
// Execute
int res = sqlite3_step(stmt);
sqlite3_clear_bindings(stmt);
@@ -500,6 +505,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
if (res != SQLITE_DONE) {
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
}
if (!m_txn) m_database.m_write_semaphore.post();
return res == SQLITE_DONE;
}
@@ -511,6 +519,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
// Bind: leftmost parameter in statement is index 1
if (!BindBlobToStatement(stmt, 1, blob, "key")) return false;
// Acquire semaphore if not previously acquired when creating a transaction.
if (!m_txn) m_database.m_write_semaphore.wait();
// Execute
int res = sqlite3_step(stmt);
sqlite3_clear_bindings(stmt);
@@ -518,6 +529,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
if (res != SQLITE_DONE) {
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
}
if (!m_txn) m_database.m_write_semaphore.post();
return res == SQLITE_DONE;
}
@@ -634,30 +648,43 @@ std::unique_ptr<DatabaseCursor> SQLiteBatch::GetNewPrefixCursor(Span<const std::
bool SQLiteBatch::TxnBegin()
{
if (!m_database.m_db || m_database.HasActiveTxn()) return false;
if (!m_database.m_db || m_txn) return false;
m_database.m_write_semaphore.wait();
Assert(!m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "BEGIN TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to begin the transaction\n");
m_database.m_write_semaphore.post();
} else {
m_txn = true;
}
return res == SQLITE_OK;
}
bool SQLiteBatch::TxnCommit()
{
if (!m_database.HasActiveTxn()) return false;
if (!m_database.m_db || !m_txn) return false;
Assert(m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "COMMIT TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to commit the transaction\n");
} else {
m_txn = false;
m_database.m_write_semaphore.post();
}
return res == SQLITE_OK;
}
bool SQLiteBatch::TxnAbort()
{
if (!m_database.HasActiveTxn()) return false;
if (!m_database.m_db || !m_txn) return false;
Assert(m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "ROLLBACK TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to abort the transaction\n");
} else {
m_txn = false;
m_database.m_write_semaphore.post();
}
return res == SQLITE_OK;
}

View File

@@ -58,6 +58,18 @@ private:
sqlite3_stmt* m_delete_stmt{nullptr};
sqlite3_stmt* m_delete_prefix_stmt{nullptr};
/** Whether this batch has started a database transaction and whether it owns SQLiteDatabase::m_write_semaphore.
* If the batch starts a db tx, it acquires the semaphore and sets this to true, keeping the semaphore
* until the transaction ends to prevent other batch objects from writing to the database.
*
* If this batch did not start a transaction, the semaphore is acquired transiently when writing and m_txn
* is not set.
*
* m_txn is different from HasActiveTxn() as it is only true when this batch has started the transaction,
* not just when any batch has started a transaction.
*/
bool m_txn{false};
void SetupSQLStatements();
bool ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob);
@@ -115,6 +127,10 @@ public:
~SQLiteDatabase();
// Batches must acquire this semaphore on writing, and release when done writing.
// This ensures that only one batch is modifying the database at a time.
CSemaphore m_write_semaphore;
bool Verify(bilingual_str& error);
/** Open the database if it is not already opened */