postgres: add connection limit

This commit is contained in:
Joost Jager
2021-11-17 09:45:45 +01:00
parent b0fa19e8b0
commit 274faff980
8 changed files with 130 additions and 7 deletions

View File

@@ -4,6 +4,7 @@ import "time"
// Config holds postgres configuration data.
type Config struct {
Dsn string `long:"dsn" description:"Database connection string."`
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
Dsn string `long:"dsn" description:"Database connection string."`
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
MaxConnections int `long:"maxconnections" description:"The maximum number of open connections to the database. Set to zero for unlimited."`
}

View File

@@ -13,7 +13,6 @@ import (
"time"
"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/jackc/pgx/v4/stdlib"
)
const (
@@ -58,6 +57,14 @@ type db struct {
// Enforce db implements the walletdb.DB interface.
var _ walletdb.DB = (*db)(nil)
// Global set of database connections.
var dbConns *dbConnSet
// Init initializes the global set of database connections.
func Init(maxConnections int) {
dbConns = newDbConnSet(maxConnections)
}
// newPostgresBackend returns a db object initialized with the passed backend
// config. If postgres connection cannot be estabished, then returns error.
func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
@@ -67,7 +74,11 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
return nil, errors.New("empty postgres prefix")
}
dbConn, err := sql.Open("pgx", config.Dsn)
if dbConns == nil {
return nil, errors.New("db connection set not initialized")
}
dbConn, err := dbConns.Open(config.Dsn)
if err != nil {
return nil, err
}
@@ -245,5 +256,5 @@ func (db *db) Copy(w io.Writer) error {
// Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error {
return db.db.Close()
return dbConns.Close(db.cfg.Dsn)
}

View File

@@ -0,0 +1,89 @@
package postgres
import (
"database/sql"
"fmt"
"sync"
_ "github.com/jackc/pgx/v4/stdlib"
)
// dbConn stores the actual connection and a user count.
type dbConn struct {
db *sql.DB
count int
}
// dbConnSet stores a set of connections.
type dbConnSet struct {
dbConn map[string]*dbConn
maxConnections int
sync.Mutex
}
// newDbConnSet initializes a new set of connections.
func newDbConnSet(maxConnections int) *dbConnSet {
return &dbConnSet{
dbConn: make(map[string]*dbConn),
maxConnections: maxConnections,
}
}
// Open opens a new database connection. If a connection already exists for the
// given dsn, the existing connection is returned.
func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
d.Lock()
defer d.Unlock()
if dbConn, ok := d.dbConn[dsn]; ok {
dbConn.count++
return dbConn.db, nil
}
db, err := sql.Open("pgx", dsn)
if err != nil {
return nil, err
}
// Limit maximum number of open connections. This is useful to prevent
// the server from running out of connections and returning an error.
// With this client-side limit in place, lnd will wait for a connection
// to become available.
if d.maxConnections != 0 {
db.SetMaxOpenConns(d.maxConnections)
}
d.dbConn[dsn] = &dbConn{
db: db,
count: 1,
}
return db, nil
}
// Close closes the connection with the given dsn. If there are still other
// users of the same connection, this function does nothing.
func (d *dbConnSet) Close(dsn string) error {
d.Lock()
defer d.Unlock()
dbConn, ok := d.dbConn[dsn]
if !ok {
return fmt.Errorf("connection not found: %v", dsn)
}
// Reduce user count.
dbConn.count--
// Do not close if there are other users.
if dbConn.count > 0 {
return nil
}
// Close connection.
delete(d.dbConn, dsn)
return dbConn.db.Close()
}

6
kvdb/postgres/no_db.go Normal file
View File

@@ -0,0 +1,6 @@
//go:build !kvdb_postgres
// +build !kvdb_postgres
package postgres
func Init(maxConnections int) {}