Basic server app with WS connection.
Some checks failed
Linting and tests / Linting (push) Failing after 37s

This commit is contained in:
2025-09-15 09:33:25 +05:00
parent 466b58b41d
commit 2c13e3f380
25 changed files with 928 additions and 27 deletions

View File

@@ -0,0 +1,83 @@
package database
import (
"errors"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"github.com/jmoiron/sqlx"
// postgresql driver.
_ "github.com/jackc/pgx/v5/stdlib"
)
const (
databaseDSNEnvVar = "BUNKERD_DATABASE_DSN"
databaseMaxIdleConnsEnvVar = "BUNKERD_DATABASE_MAX_IDLE_CONNS"
databaseMaxOpenedConnsEnvVar = "BUNKERD_DATABASE_MAX_OPENED_CONNS"
)
var (
errDSNInvalid = errors.New("BUNKERD_DATABASE_DSN environment variable is empty or invalid")
errNoMaxIdleConns = errors.New("no BUNKERD_DATABASE_MAX_IDLE_CONNS defined")
errNoMaxOpenedConns = errors.New("no BUNKERD_DATABASE_MAX_OPENED_CONNS defined")
errPostgresOnlySupported = errors.New("only PostgreSQL database is currently supported")
)
func (d *database) initializeConnection() error {
// Getting database DSN from environment as well as other required settings.
dsn, found := os.LookupEnv(databaseDSNEnvVar)
if !found {
return fmt.Errorf("initialize connection: getting database DSN: %w", errDSNInvalid)
}
maxOpenedConnsRaw, found := os.LookupEnv(databaseMaxOpenedConnsEnvVar)
if !found {
return fmt.Errorf("initialize connection: getting maximum number of opened conections: %w", errNoMaxOpenedConns)
}
maxOpenedConns, err := strconv.ParseInt(maxOpenedConnsRaw, 10, 64)
if err != nil {
return fmt.Errorf("initialize connection: parsing maximum number of opened conections: %w", err)
}
maxIdleConnsRaw, found := os.LookupEnv(databaseMaxIdleConnsEnvVar)
if !found {
return fmt.Errorf("initialize connection: getting maximum number of idle conections: %w", errNoMaxIdleConns)
}
maxIdleConns, err := strconv.ParseInt(maxIdleConnsRaw, 10, 64)
if err != nil {
return fmt.Errorf("initialize connection: parsing maximum number of opened conections: %w", err)
}
// While database/sql (and sqlx) supports all possible DSN formats, we will force user to use DSN in form
// "proto://user:passowrd@host:port/dbname" as it is easier to parse.
if _, err := url.Parse(dsn); err != nil {
return fmt.Errorf("initialize connection: validate DSN: %w", err)
}
// Currently we're support only postgresql, but this may change in future.
if !strings.HasPrefix(dsn, "postgres://") {
return fmt.Errorf("initialize connection: validate DSN: %w", errPostgresOnlySupported)
}
proto := strings.Split(dsn, ":")[0]
if proto == "postgres" {
proto = "pgx"
}
db, err := sqlx.Open(proto, dsn)
if err != nil {
return fmt.Errorf("initialize connection: open database: %w", err)
}
d.db = db
d.db.SetMaxOpenConns(int(maxOpenedConns))
d.db.SetMaxIdleConns(int(maxIdleConns))
return nil
}

View File

@@ -0,0 +1,73 @@
package database
import (
"fmt"
"io/fs"
"log/slog"
"bunker/server/internal/application"
"bunker/server/internal/services/core"
"github.com/jmoiron/sqlx"
)
var _ = core.Database(&database{})
type database struct {
app *application.Application
db *sqlx.DB
logger *slog.Logger
migrations map[string]fs.FS
version int64
}
// Initialize initializes service.
func Initialize(app *application.Application) error {
db := &database{
app: app,
}
if err := app.RegisterService(db); err != nil {
return fmt.Errorf("%w: %w", core.ErrDatabase, err)
}
return nil
}
func (d *database) Configure() error {
if err := d.initializeConnection(); err != nil {
return fmt.Errorf("configure: %w", err)
}
return nil
}
func (d *database) ConnectDependencies() error {
return nil
}
func (d *database) Initialize() error {
d.logger = d.app.NewLogger("service", core.ServiceNameDatabase)
d.logger.Info("Initializing...")
d.migrations = make(map[string]fs.FS, 0)
return nil
}
func (d *database) Name() string {
return core.ServiceNameDatabase
}
func (d *database) LaunchStartupTasks() error {
if err := d.applyMigrations(); err != nil {
return fmt.Errorf("launch startup tasks: %w", err)
}
return nil
}
func (d *database) Shutdown() error {
return nil
}

View File

@@ -0,0 +1,82 @@
package database
import (
"errors"
"fmt"
"io/fs"
"log/slog"
"sort"
"strings"
"bunker/commons"
"bunker/server/internal/services/core"
"github.com/pressly/goose/v3"
)
var errMigrationsAlreadyRegistered = errors.New("migrations already registered")
func (d *database) applyMigrations() error {
d.logger.Info("Migrating database...")
modules := make([]string, 0)
for module := range d.migrations {
modules = append(modules, module)
}
sort.Strings(modules)
_ = goose.SetDialect(string(goose.DialectPostgres))
gooseLogger := commons.NewGooseLogger(d.logger)
goose.SetLogger(gooseLogger)
for _, module := range modules {
d.logger.Info("Migrating database for module...", "module", module)
goose.SetBaseFS(d.migrations[module])
goose.SetTableName(strings.ReplaceAll(module, "/", "_") + "_migrations")
if err := goose.Up(d.db.DB, "migrations"); err != nil {
return fmt.Errorf("%w: applying migrations for module '%s': %w", core.ErrDatabase, module, err)
}
moduleDBVersion, err := goose.GetDBVersion(d.db.DB)
if err != nil {
return fmt.Errorf("%w: get database version for module '%s': %w", core.ErrDatabase, module, err)
}
d.version += moduleDBVersion
d.logger.Info(
"Database for module migrated to latest version",
"module", module,
"module_db_version", moduleDBVersion,
"db_version", d.version,
)
}
d.logger.Info("Database migrated.", "version", d.version)
return nil
}
func (d *database) RegisterMigrations(moduleName string, fSys fs.FS) error {
slog.Debug("Registering migrations for service.", "service", moduleName)
if _, found := d.migrations[moduleName]; found {
return fmt.Errorf(
"%w: RegisterMigrations: module '%s': %w",
core.ErrDatabase,
moduleName,
errMigrationsAlreadyRegistered,
)
}
d.migrations[moduleName] = fSys
slog.Debug("Migrations for service successfully registered.", "service", moduleName)
return nil
}

View File

@@ -0,0 +1,69 @@
package database
import (
"context"
"fmt"
"strings"
"bunker/server/internal/services/core"
)
// Exec is a proxy for ExecContext from sqlx.
func (d *database) Exec(ctx context.Context, query string, params ...interface{}) error {
if strings.Contains(query, "?") {
query = d.db.Rebind(query)
}
d.logger.Debug("Executing query.", "query", query, "params", fmt.Sprintf("%+v", params))
if _, err := d.db.ExecContext(ctx, query, params...); err != nil {
return fmt.Errorf("%w: failed to Exec(): %w", core.ErrDatabase, err)
}
return nil
}
// Get is a proxy for GetContext from sqlx.
func (d *database) Get(ctx context.Context, target interface{}, query string, params ...interface{}) error {
if strings.Contains(query, "?") {
query = d.db.Rebind(query)
}
d.logger.Debug("Getting single data from database with query.", "query", query, "params", fmt.Sprintf("%+v", params))
if err := d.db.GetContext(ctx, target, query, params...); err != nil {
return fmt.Errorf("%w: failed to Get(): %w", core.ErrDatabase, err)
}
return nil
}
// NamedExec is a proxy for NamedExecContext from sqlx.
func (d *database) NamedExec(ctx context.Context, query string, param interface{}) error {
if strings.Contains(query, "?") {
query = d.db.Rebind(query)
}
d.logger.Debug("Executing named query.", "query", query, "params", fmt.Sprintf("%+v", param))
if _, err := d.db.NamedExecContext(ctx, query, param); err != nil {
return fmt.Errorf("%w: failed to NamedExec(): %w", core.ErrDatabase, err)
}
return nil
}
// Select is a proxy for SelectContext from sqlx.
func (d *database) Select(ctx context.Context, target interface{}, query string, params ...interface{}) error {
if strings.Contains(query, "?") {
query = d.db.Rebind(query)
}
d.logger.Debug("Selecting from database with query.", "query", query, "params", fmt.Sprintf("%+v", params))
if err := d.db.SelectContext(ctx, target, query, params...); err != nil {
return fmt.Errorf("%w: failed to Select(): %w", core.ErrDatabase, err)
}
return nil
}

View File

@@ -0,0 +1,81 @@
package database
import (
"context"
"fmt"
"log/slog"
"bunker/server/internal/services/core"
"github.com/jmoiron/sqlx"
)
type transaction struct {
transaction *sqlx.Tx
logger *slog.Logger
}
func (d *database) Transaction(ctx context.Context) (core.DatabaseTransaction, error) {
txn, err := d.db.BeginTxx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("%w: starting transaction: %w", core.ErrDatabase, err)
}
txHandler := &transaction{
transaction: txn,
logger: d.logger.With("module", "transactioner"),
}
return txHandler, nil
}
func (t *transaction) Apply(steps ...core.TransactionFunc) error {
for stepNumber, stepFunc := range steps {
if err := stepFunc(t.transaction); err != nil {
t.logger.Error(
"Error occurred.",
"step", stepNumber,
"error", err.Error(),
"module", "core/database",
"subsystem", "transaction",
)
if rollbackErr := t.transaction.Rollback(); rollbackErr != nil {
t.logger.Error(
"Transaction rollback failed.",
"error", err.Error(),
"module", "core/database",
"subsystem", "transaction",
)
return fmt.Errorf("%w: transaction rollback: %w", core.ErrDatabase, rollbackErr)
}
return err
}
}
if err := t.transaction.Commit(); err != nil {
t.logger.Error(
"Transaction commit failed.",
"error", err.Error(),
"module", "core/database",
"subsystem", "transaction",
)
if rollbackErr := t.transaction.Rollback(); rollbackErr != nil {
t.logger.Error(
"Transaction rollback failed.",
"error", err.Error(),
"module", "core/database",
"subsystem", "transaction",
)
return fmt.Errorf("%w: transaction rollback: %w", core.ErrDatabase, rollbackErr)
}
return fmt.Errorf("%w: transaction commit: %w", core.ErrDatabase, err)
}
return nil
}