Skip to content

Commit

Permalink
added more db resiliency
Browse files Browse the repository at this point in the history
  • Loading branch information
joshghent committed Oct 8, 2024
1 parent 32e14dc commit 8e89de8
Showing 1 changed file with 60 additions and 7 deletions.
67 changes: 60 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"encoding/csv"
"fmt"
"log"
"net/http"
_ "net/http/pprof" // Register pprof handlers

// "net/http"
// _ "net/http/pprof" // Register pprof handlers
"os"
"strings"
"time"
Expand All @@ -20,11 +21,10 @@ var db *pgxpool.Pool

func main() {
log.SetOutput(os.Stdout)
// Start pprof for profiling in a separate goroutine
go func() {
log.Println("Starting pprof on :6060")
http.ListenAndServe(":6060", nil)
}()
// go func() {
// log.Println("Starting pprof on :6060")
// http.ListenAndServe(":6060", nil)
// }()

var err error
db, err = connectToDB()
Expand All @@ -34,6 +34,8 @@ func main() {
defer db.Close()
log.Println("Connected to the database successfully.")

go monitorDBConnections(db)

r := gin.Default()

r.POST("/api/v1/code/redeem", getCodeHandler)
Expand Down Expand Up @@ -73,6 +75,8 @@ func connectToDB() (*pgxpool.Pool, error) {
config.MaxConns = 20 // Adjust based on expected workload
config.MaxConnIdleTime = 30 * time.Minute
config.MaxConnLifetime = 2 * time.Hour
config.HealthCheckPeriod = 1 * time.Minute // Add health check period
config.ConnConfig.ConnectTimeout = 5 * time.Second // Add connection timeout

db, err = pgxpool.ConnectConfig(context.Background(), config)
if err == nil {
Expand All @@ -81,6 +85,12 @@ func connectToDB() (*pgxpool.Pool, error) {
if err == nil {
break
}
// Test the connection by querying the database
var testResult int
err = db.QueryRow(context.Background(), "SELECT 1").Scan(&testResult)
if err != nil {
return nil, fmt.Errorf("error testing database connection: %v", err)
}
}
log.Printf("Error connecting to database (attempt %d/%d): %v\nDatabase URL: %s", i+1, maxRetries, err, databaseURL)
time.Sleep(5 * time.Second)
Expand All @@ -89,6 +99,49 @@ func connectToDB() (*pgxpool.Pool, error) {
return db, err
}

func monitorDBConnections(pool *pgxpool.Pool) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for range ticker.C {
stats := pool.Stat()
log.Printf("DB Pool Stats - Total: %d, Idle: %d, In Use: %d, Max: %d",
stats.TotalConns(), stats.IdleConns(), stats.AcquiredConns(), stats.MaxConns())

// Close idle connections
pool.AcquireAllIdle(context.Background())

// Check for stalled connections and reset them
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
_, err := conn.Exec(ctx, "SELECT 1")
if err != nil {
log.Printf("Resetting stalled connection: %v", err)
conn.Hijack()
}
return nil
})
cancel()
if err != nil {
log.Printf("Error checking for stalled connections: %v", err)
}

// Check for long-running transactions
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, err = pool.Exec(ctx, `
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE state = 'active'
AND state_change < NOW() - INTERVAL '30 seconds'
AND query NOT LIKE '%pg_terminate_backend%'
`)
cancel()
if err != nil {
log.Printf("Error terminating long-running transactions: %v", err)
}
}
}

func testDBConnection(db *pgxpool.Pool) error {
// Check if the required tables exist
tables := []string{"batches", "codes"}
Expand Down

0 comments on commit 8e89de8

Please sign in to comment.