diff --git a/kvdb/postgres/db.go b/kvdb/postgres/db.go
index aa517e652..2c31d46e0 100644
--- a/kvdb/postgres/db.go
+++ b/kvdb/postgres/db.go
@@ -20,10 +20,44 @@ const (
kvTableName = "kv"
)
+// SqlConfig holds a set of configuration options of a sql database connection.
+type SqlConfig struct {
+ // DriverName is the string that defines the registered sql driver that
+ // is to be used.
+ DriverName string
+
+ // Dsn is the database connection string that will be used to connect
+ // to the db.
+ Dsn string
+
+ // Timeout is the time after which a query to the db will be canceled if
+ // it has not yet completed.
+ Timeout time.Duration
+
+ // Schema is the name of the schema under which the sql tables should be
+ // created. It should be left empty for backends like sqlite that do not
+ // support having more than one schema.
+ Schema string
+
+ // TableNamePrefix is the name that should be used as a table name
+ // prefix when constructing the KV style table.
+ TableNamePrefix string
+
+ // SQLiteCmdReplacements define a one-to-one string mapping of sql
+ // keywords to the strings that should replace those keywords in any
+ // commands. Note that the sqlite keywords to be replaced are
+ // case-sensitive.
+ SQLiteCmdReplacements SQLiteCmdReplacements
+
+ // WithTxLevelLock when set will ensure that there is a transaction
+ // level lock.
+ WithTxLevelLock bool
+}
+
// db holds a reference to the postgres connection.
type db struct {
- // cfg is the postgres connection config.
- cfg *Config
+ // cfg is the sql db connection config.
+ cfg *SqlConfig
// prefix is the table name prefix that is used to simulate namespaces.
// We don't use schemas because at least sqlite does not support that.
@@ -38,7 +72,8 @@ type db struct {
// db is the underlying database connection instance.
db *sql.DB
- // lock is the global write lock that ensures single writer.
+ // lock is the global write lock that ensures single writer. This is
+ // only used if cfg.WithTxLevelLock is set.
lock sync.RWMutex
// table is the name of the table that contains the data for all
@@ -68,86 +103,45 @@ func Init(maxConnections int) {
dbConns = newDbConnSet(maxConnections)
}
-// newPostgresBackend returns a db object initialized with the passed backend
-// config. If postgres connection cannot be established, then returns error.
-func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
- *db, error) {
-
+// NewSqlBackend returns a db object initialized with the passed backend
+// config. If database connection cannot be established, then returns error.
+func NewSqlBackend(ctx context.Context, cfg *SqlConfig) (*db, error) {
dbConnsMu.Lock()
defer dbConnsMu.Unlock()
- if prefix == "" {
- return nil, errors.New("empty postgres prefix")
- }
-
if dbConns == nil {
return nil, errors.New("db connection set not initialized")
}
- dbConn, err := dbConns.Open(config.Dsn)
+ if cfg.TableNamePrefix == "" {
+ return nil, errors.New("empty table name prefix")
+ }
+
+ table := fmt.Sprintf("%s_%s", cfg.TableNamePrefix, kvTableName)
+
+ query := newKVSchemaCreationCmd(
+ table, cfg.Schema, cfg.SQLiteCmdReplacements,
+ )
+
+ dbConn, err := dbConns.Open(cfg.DriverName, cfg.Dsn)
if err != nil {
return nil, err
}
- // Compose system table names.
- table := fmt.Sprintf(
- "%s_%s", prefix, kvTableName,
- )
-
- // Execute the create statements to set up a kv table in postgres. Every
- // row points to the bucket that it is one via its parent_id field. A
- // NULL parent_id means that the key belongs to the upper-most bucket in
- // this table. A constraint on parent_id is enforcing referential
- // integrity.
- //
- // Furthermore there is a
_p index on parent_id that is required
- // for the foreign key constraint.
- //
- // Finally there are unique indices on (parent_id, key) to prevent the
- // same key being present in a bucket more than once (_up and
- // _unp). In postgres, a single index wouldn't enforce the unique
- // constraint on rows with a NULL parent_id. Therefore two indices are
- // defined.
- _, err = dbConn.ExecContext(ctx, `
-CREATE SCHEMA IF NOT EXISTS public;
-CREATE TABLE IF NOT EXISTS public.`+table+`
-(
- key bytea NOT NULL,
- value bytea,
- parent_id bigint,
- id bigserial PRIMARY KEY,
- sequence bigint,
- CONSTRAINT `+table+`_parent FOREIGN KEY (parent_id)
- REFERENCES public.`+table+` (id)
- ON UPDATE NO ACTION
- ON DELETE CASCADE
-);
-
-CREATE INDEX IF NOT EXISTS `+table+`_p
- ON public.`+table+` (parent_id);
-
-CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_up
- ON public.`+table+`
- (parent_id, key) WHERE parent_id IS NOT NULL;
-
-CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_unp
- ON public.`+table+` (key) WHERE parent_id IS NULL;
-`)
+ _, err = dbConn.ExecContext(ctx, query)
if err != nil {
_ = dbConn.Close()
return nil, err
}
- backend := &db{
- cfg: config,
- prefix: prefix,
+ return &db{
+ cfg: cfg,
ctx: ctx,
db: dbConn,
table: table,
- }
-
- return backend, nil
+ prefix: cfg.TableNamePrefix,
+ }, nil
}
// getTimeoutCtx gets a timeout context for database requests.
@@ -269,3 +263,28 @@ func (db *db) Close() error {
return dbConns.Close(db.cfg.Dsn)
}
+
+// sqliteCmdReplacements defines a mapping from some SQLite keywords and phrases
+// to their postgres counterparts.
+var sqliteCmdReplacements = SQLiteCmdReplacements{
+ "BLOB": "BYTEA",
+ "INTEGER PRIMARY KEY": "BIGSERIAL PRIMARY KEY",
+}
+
+// newPostgresBackend returns a db object initialized with the passed backend
+// config. If postgres connection cannot be established, then returns error.
+func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
+ walletdb.DB, error) {
+
+ cfg := &SqlConfig{
+ DriverName: "pgx",
+ Dsn: config.Dsn,
+ Timeout: config.Timeout,
+ Schema: "public",
+ TableNamePrefix: prefix,
+ SQLiteCmdReplacements: sqliteCmdReplacements,
+ WithTxLevelLock: true,
+ }
+
+ return NewSqlBackend(ctx, cfg)
+}
diff --git a/kvdb/postgres/db_conn_set.go b/kvdb/postgres/db_conn_set.go
index ced065969..736f8516b 100644
--- a/kvdb/postgres/db_conn_set.go
+++ b/kvdb/postgres/db_conn_set.go
@@ -33,7 +33,7 @@ func newDbConnSet(maxConnections int) *dbConnSet {
// Open opens a new database connection. If a connection already exists for the
// given dsn, the existing connection is returned.
-func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
+func (d *dbConnSet) Open(driver, dsn string) (*sql.DB, error) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -43,7 +43,7 @@ func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
return dbConn.db, nil
}
- db, err := sql.Open("pgx", dsn)
+ db, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}
diff --git a/kvdb/postgres/db_test.go b/kvdb/postgres/db_test.go
index d397223d1..0378b4dbb 100644
--- a/kvdb/postgres/db_test.go
+++ b/kvdb/postgres/db_test.go
@@ -41,7 +41,7 @@ func TestPanic(t *testing.T) {
f, err := NewFixture("")
require.NoError(t, err)
- err = f.Db.(*db).Update(func(tx walletdb.ReadWriteTx) error {
+ err = f.Db.Update(func(tx walletdb.ReadWriteTx) error {
bucket, err := tx.CreateTopLevelBucket([]byte("test"))
require.NoError(t, err)
diff --git a/kvdb/postgres/readwrite_bucket.go b/kvdb/postgres/readwrite_bucket.go
index f71db50bd..71856b213 100644
--- a/kvdb/postgres/readwrite_bucket.go
+++ b/kvdb/postgres/readwrite_bucket.go
@@ -89,6 +89,15 @@ func (b *readWriteBucket) Get(key []byte) []byte {
panic(err)
}
+ // When an empty byte array is stored as the value, Sqlite will decode
+ // that into nil whereas postgres will decode that as an empty byte
+ // array. Since returning nil is taken to mean that no value has ever
+ // been written, we ensure here that we at least return an empty array
+ // so that nil checks will fail.
+ if len(*value) == 0 {
+ return []byte{}
+ }
+
return *value
}
diff --git a/kvdb/postgres/readwrite_tx.go b/kvdb/postgres/readwrite_tx.go
index 592128ad6..c31b05603 100644
--- a/kvdb/postgres/readwrite_tx.go
+++ b/kvdb/postgres/readwrite_tx.go
@@ -28,14 +28,17 @@ type readWriteTx struct {
// newReadWriteTx creates an rw transaction using a connection from the
// specified pool.
func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
- // Obtain the global lock instance. An alternative here is to obtain a
- // database lock from Postgres. Unfortunately there is no database-level
- // lock in Postgres, meaning that each table would need to be locked
- // individually. Perhaps an advisory lock could perform this function
- // too.
- var locker sync.Locker = &db.lock
- if readOnly {
- locker = db.lock.RLocker()
+ locker := newNoopLocker()
+ if db.cfg.WithTxLevelLock {
+ // Obtain the global lock instance. An alternative here is to
+ // obtain a database lock from Postgres. Unfortunately there is
+ // no database-level lock in Postgres, meaning that each table
+ // would need to be locked individually. Perhaps an advisory
+ // lock could perform this function too.
+ locker = &db.lock
+ if readOnly {
+ locker = db.lock.RLocker()
+ }
}
locker.Lock()
@@ -198,3 +201,25 @@ func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result,
return tx.tx.ExecContext(ctx, query, args...)
}
+
+// noopLocker is an implementation of a no-op sync.Locker.
+type noopLocker struct{}
+
+// newNoopLocker creates a new noopLocker.
+func newNoopLocker() sync.Locker {
+ return &noopLocker{}
+}
+
+// Lock is a noop.
+//
+// NOTE: this is part of the sync.Locker interface.
+func (n *noopLocker) Lock() {
+}
+
+// Unlock is a noop.
+//
+// NOTE: this is part of the sync.Locker interface.
+func (n *noopLocker) Unlock() {
+}
+
+var _ sync.Locker = (*noopLocker)(nil)
diff --git a/kvdb/postgres/schema.go b/kvdb/postgres/schema.go
new file mode 100644
index 000000000..9276326e3
--- /dev/null
+++ b/kvdb/postgres/schema.go
@@ -0,0 +1,74 @@
+//go:build kvdb_postgres
+
+package postgres
+
+import (
+ "fmt"
+ "strings"
+)
+
+// SQLiteCmdReplacements is a one to one mapping of sqlite keywords that should
+// be replaced by the mapped strings in any command. Note that the sqlite
+// keywords to be replaced are case-sensitive.
+type SQLiteCmdReplacements map[string]string
+
+func newKVSchemaCreationCmd(table, schema string,
+ replacements SQLiteCmdReplacements) string {
+
+ var (
+ tableInSchema = table
+ finalCmd string
+ )
+ if schema != "" {
+ finalCmd = fmt.Sprintf(
+ `CREATE SCHEMA IF NOT EXISTS ` + schema + `;`,
+ )
+
+ tableInSchema = fmt.Sprintf("%s.%s", schema, table)
+ }
+
+ // Construct the sql statements to set up a kv table in postgres. Every
+ // row points to the bucket that it is one via its parent_id field. A
+ // NULL parent_id means that the key belongs to the uppermost bucket in
+ // this table. A constraint on parent_id is enforcing referential
+ // integrity.
+ //
+ // Furthermore, there is a _p index on parent_id that is required
+ // for the foreign key constraint.
+ //
+ // Finally, there are unique indices on (parent_id, key) to prevent the
+ // same key being present in a bucket more than once (_up and
+ // _unp). In postgres, a single index wouldn't enforce the unique
+ // constraint on rows with a NULL parent_id. Therefore, two indices are
+ // defined.
+ //
+ // The replacements map can be used to replace any sqlite keywords.
+ // Callers should note that the sqlite keywords are case-sensitive.
+ finalCmd += fmt.Sprintf(`
+CREATE TABLE IF NOT EXISTS ` + tableInSchema + `
+(
+ key BLOB NOT NULL,
+ value BLOB,
+ parent_id BIGINT,
+ id INTEGER PRIMARY KEY,
+ sequence BIGINT,
+ CONSTRAINT ` + table + `_parent FOREIGN KEY (parent_id)
+ REFERENCES ` + tableInSchema + ` (id)
+ ON UPDATE NO ACTION
+ ON DELETE CASCADE
+);
+CREATE INDEX IF NOT EXISTS ` + table + `_p
+ ON ` + tableInSchema + ` (parent_id);
+CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_up
+ ON ` + tableInSchema + `
+ (parent_id, key) WHERE parent_id IS NOT NULL;
+CREATE UNIQUE INDEX IF NOT EXISTS ` + table + `_unp
+ ON ` + tableInSchema + ` (key) WHERE parent_id IS NULL;
+`)
+
+ for from, to := range replacements {
+ finalCmd = strings.Replace(finalCmd, from, to, -1)
+ }
+
+ return finalCmd
+}