Capabilities handling, database.
This commit is contained in:
94
database/connection.go
Normal file
94
database/connection.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
// stdlib
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
// local
|
||||
"develop.pztrn.name/gonews/gonews/configuration"
|
||||
"develop.pztrn.name/gonews/gonews/database/migrations"
|
||||
|
||||
// other
|
||||
"github.com/jmoiron/sqlx"
|
||||
// PostgreSQL driver.
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func startConnectionWatcher() {
|
||||
log.Println("Initializing database connection watcher...")
|
||||
|
||||
migrations.Initialize()
|
||||
|
||||
// Checking configuration validity.
|
||||
// Parameters should not be specified in DSN.
|
||||
if strings.Contains(configuration.Cfg.Database.DSN, "?") {
|
||||
log.Fatalln("Database DSN should not contain parameters, specify them in DATABASE_PARAMS environment variable!")
|
||||
}
|
||||
|
||||
// DSN should be defined.
|
||||
if configuration.Cfg.Database.DSN == "" {
|
||||
log.Fatalln("Database DSN should be defined!")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Second * time.Duration(configuration.Cfg.Database.Timeout))
|
||||
|
||||
// First start - manually.
|
||||
_ = watcher()
|
||||
|
||||
// Then - every ticker tick.
|
||||
for range ticker.C {
|
||||
doBreak := watcher()
|
||||
if doBreak {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
ticker.Stop()
|
||||
log.Println("Connection watcher stopped and connection to database was shutted down")
|
||||
connWatcherStopped = true
|
||||
}
|
||||
|
||||
// Actual connection watcher. Returns true if we should stop watching
|
||||
// (e.g. due to shutdown) and false if everything is okay.
|
||||
func watcher() bool {
|
||||
// If we're shutting down - stop connection watcher.
|
||||
if weAreShuttingDown {
|
||||
log.Println("Closing database connection...")
|
||||
err := Conn.Close()
|
||||
if err != nil {
|
||||
log.Println("Failed to close database connection")
|
||||
}
|
||||
Conn = nil
|
||||
return true
|
||||
}
|
||||
|
||||
// If connection is nil - try to establish (or reestablish)
|
||||
// connection.
|
||||
if Conn == nil {
|
||||
log.Println("(Re)Establishing connection to PostgreSQL...")
|
||||
|
||||
// Compose DSN.
|
||||
dsn := configuration.Cfg.Database.DSN
|
||||
if configuration.Cfg.Database.Params != "" {
|
||||
dsn += "?" + configuration.Cfg.Database.Params
|
||||
}
|
||||
|
||||
// Connect to database.
|
||||
dbConn, err := sqlx.Connect("postgres", dsn)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to PostgreSQL database, will try to reconnect after %d seconds\n", configuration.Cfg.Database.Timeout)
|
||||
return false
|
||||
}
|
||||
|
||||
log.Println("Database connection (re)established")
|
||||
|
||||
// Migrate database.
|
||||
migrations.Migrate(dbConn.DB)
|
||||
|
||||
Conn = dbConn
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
45
database/exported.go
Normal file
45
database/exported.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
// stdlib
|
||||
"log"
|
||||
"time"
|
||||
|
||||
// other
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
var (
|
||||
// Conn is database connection.
|
||||
Conn *sqlx.DB
|
||||
|
||||
// Shutdown flags.
|
||||
// Sets to true when Shutdown() is called to indicate other subsystes
|
||||
// that we're shutting down.
|
||||
weAreShuttingDown bool
|
||||
// Sets to true when connection watcher will be stopped.
|
||||
connWatcherStopped bool
|
||||
)
|
||||
|
||||
func Initialize() {
|
||||
log.Println("Initializing database handler...")
|
||||
|
||||
// Reset variables to their default startup state because they
|
||||
// can be set to other values while executing tests.
|
||||
Conn = nil
|
||||
weAreShuttingDown = false
|
||||
connWatcherStopped = false
|
||||
|
||||
go startConnectionWatcher()
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
weAreShuttingDown = true
|
||||
for {
|
||||
if connWatcherStopped {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
}
|
44
database/migrations/1_create_users_table.go
Normal file
44
database/migrations/1_create_users_table.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
// stdlib
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// CreateUsersTableUp creates local users table used for authentication.
|
||||
func CreateUsersTableUp(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(`
|
||||
CREATE TABLE users (
|
||||
uuid UUID NOT NULL,
|
||||
login TEXT NOT NULL,
|
||||
password_hash TEXT NOT NULL,
|
||||
password_salt TEXT NOT NULL,
|
||||
active BOOLEAN NOT NULL DEFAULT false,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL
|
||||
);
|
||||
|
||||
COMMENT ON COLUMN users.uuid IS 'User UUID';
|
||||
COMMENT ON COLUMN users.login IS 'User login';
|
||||
COMMENT ON COLUMN users.password_hash IS 'Hashed user password';
|
||||
COMMENT ON COLUMN users.password_salt IS 'Salt for user password';
|
||||
COMMENT ON COLUMN users.active IS 'Active user flag. 0 - banned';
|
||||
COMMENT ON COLUMN users.created_at IS 'User registration timestamp';
|
||||
|
||||
CREATE INDEX users_uuid_idx ON users(uuid);
|
||||
CREATE INDEX users_login_idx ON users(login);
|
||||
CREATE INDEX users_created_at_idx ON users(created_at);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateUsersTableDown deletes local users table used for authentication.
|
||||
func CreateUsersTableDown(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(`DROP TABLE users;`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
81
database/migrations/exported.go
Normal file
81
database/migrations/exported.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
// stdlib
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
// other
|
||||
"github.com/pressly/goose"
|
||||
)
|
||||
|
||||
func Initialize() {
|
||||
log.Println("Initializing database migrations...")
|
||||
|
||||
_ = goose.SetDialect("postgres")
|
||||
|
||||
goose.AddNamedMigration("1_create_users_table.go", CreateUsersTableUp, CreateUsersTableDown)
|
||||
|
||||
// Migrations should be registered here.
|
||||
}
|
||||
|
||||
func Migrate(db *sql.DB) {
|
||||
log.Println("Starting database migration procedure...")
|
||||
|
||||
// Prepare migrations configuration.
|
||||
var action = "UP"
|
||||
actionFromEnv, actionFound := os.LookupEnv("DATABASE_ACTION")
|
||||
if actionFound {
|
||||
log.Println("Migration action override: " + actionFromEnv)
|
||||
action = actionFromEnv
|
||||
} else {
|
||||
log.Println("Executing default migration action (UP)")
|
||||
}
|
||||
|
||||
var count int64
|
||||
countFromEnv, countFound := os.LookupEnv("DATABASE_COUNT")
|
||||
if countFound {
|
||||
log.Println("Migration count override: " + countFromEnv)
|
||||
countAsInt, err := strconv.ParseInt(countFromEnv, 10, 64)
|
||||
if err != nil {
|
||||
log.Fatalln("Failed to convert count gathered from DATABASE_COUNT to integer")
|
||||
}
|
||||
count = countAsInt
|
||||
} else {
|
||||
log.Println("Applying or rollback this count of migrations: " + countFromEnv + ". 0 - all.")
|
||||
}
|
||||
|
||||
// Execute migrations.
|
||||
var err error
|
||||
currentDBVersion, gooseerr := goose.GetDBVersion(db)
|
||||
if gooseerr != nil {
|
||||
log.Fatalln("Failed to get database version: " + gooseerr.Error())
|
||||
}
|
||||
log.Println("Current database version obtained: " + strconv.Itoa(int(currentDBVersion)))
|
||||
if action == "UP" && count == 0 {
|
||||
log.Println("Applying all unapplied migrations...")
|
||||
err = goose.Up(db, ".")
|
||||
} else if action == "UP" && count != 0 {
|
||||
newVersion := currentDBVersion + count
|
||||
log.Println("Migrating database to specific version: " + strconv.Itoa(int(newVersion)))
|
||||
err = goose.UpTo(db, ".", newVersion)
|
||||
} else if action == "DOWN" && count == 0 {
|
||||
log.Println("Downgrading database to zero state, you'll need to re-apply migrations!")
|
||||
err = goose.Down(db, ".")
|
||||
log.Fatalln("Database downgraded to zero state. You have to re-apply migrations")
|
||||
} else if action == "DOWN" && count != 0 {
|
||||
newVersion := currentDBVersion - count
|
||||
log.Println("Downgrading database to specific version: " + strconv.Itoa(int(newVersion)))
|
||||
err = goose.DownTo(db, ".", newVersion)
|
||||
} else {
|
||||
log.Fatalln("Unsupported set of migration parameters, cannot continue: " + action + "/" + countFromEnv)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Fatalln("Failed to execute migration sequence: " + err.Error())
|
||||
}
|
||||
|
||||
log.Println("Database migrated successfully")
|
||||
}
|
Reference in New Issue
Block a user