hardlink/handlers/db_helpers.go

207 lines
4.8 KiB
Go

package handlers
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"gitea.futuresens.co.uk/futuresens/hardlink/db"
log "github.com/sirupsen/logrus"
)
type preauthSpoolRecord struct {
CreatedAt time.Time `json:"createdAt"`
CheckoutDate string `json:"checkoutDate"` // keep as received
Fields map[string]string `json:"fields"` // ChipDNA result.Fields
}
func (app *App) getDB(ctx context.Context) (*sql.DB, error) {
app.dbMu.Lock()
defer app.dbMu.Unlock()
// Fast path: db exists and is alive
if app.db != nil {
pingCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
if err := app.db.PingContext(pingCtx); err == nil {
return app.db, nil
}
// stale handle
_ = app.db.Close()
app.db = nil
}
// Reconnect once, bounded
dialCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
dbConn, err := db.InitMSSQL(
app.cfg.Dbport,
app.cfg.Dbuser,
app.cfg.Dbpassword,
app.cfg.Dbname,
)
if err != nil {
return nil, err
}
pingCtx, cancel2 := context.WithTimeout(dialCtx, 1*time.Second)
defer cancel2()
if err := dbConn.PingContext(pingCtx); err != nil {
_ = dbConn.Close()
return nil, err
}
app.db = dbConn
return app.db, nil
}
func (app *App) spoolPath() string {
// keep it near logs; adjust if you prefer a dedicated dir
// ensure LogDir ends with separator in your config loader
return filepath.Join(app.cfg.LogDir, "preauth_spool.ndjson")
}
// persistPreauth tries DB first; if DB is down or insert fails, it spools to file.
// It never returns an error to the caller (so your HTTP flow stays simple),
// but it logs failures.
func (app *App) persistPreauth(ctx context.Context, fields map[string]string, checkoutDate string) {
// First, try DB (with your reconnect logic inside getDB)
dbConn, err := app.getDB(ctx)
if err == nil && dbConn != nil {
if err := db.InsertPreauth(ctx, dbConn, fields, checkoutDate); err == nil {
// opportunistic drain once DB is alive
go app.drainPreauthSpool(context.Background())
return
} else {
log.WithError(err).Warn("DB insert failed; will spool preauth")
}
} else {
log.WithError(err).Warn("DB unavailable; will spool preauth")
}
// Fallback: spool to file
rec := preauthSpoolRecord{
CreatedAt: time.Now().UTC(),
CheckoutDate: checkoutDate,
Fields: fields,
}
if spErr := app.spoolPreauth(rec); spErr != nil {
log.WithError(spErr).Error("failed to spool preauth")
}
}
// append one line JSON (NDJSON)
func (app *App) spoolPreauth(rec preauthSpoolRecord) error {
p := app.spoolPath()
f, err := os.OpenFile(p, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open spool file: %w", err)
}
defer f.Close()
b, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("marshal spool record: %w", err)
}
if _, err := f.Write(append(b, '\n')); err != nil {
return fmt.Errorf("write spool record: %w", err)
}
return f.Sync() // ensure it's on disk
}
// Drain spool into DB.
// Strategy: read all lines, insert each; keep failures in a temp file; then replace original.
func (app *App) drainPreauthSpool(ctx context.Context) {
dbConn, err := app.getDB(ctx)
if err != nil {
return // still down, nothing to do
}
spool := app.spoolPath()
in, err := os.Open(spool)
if err != nil {
// no spool is fine
return
}
defer in.Close()
tmp := spool + ".tmp"
out, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil {
log.WithError(err).Warn("drain spool: open tmp failed")
return
}
defer out.Close()
sc := bufio.NewScanner(in)
// allow long lines if receipts ever sneak in (shouldn't, but safe)
buf := make([]byte, 0, 64*1024)
sc.Buffer(buf, 2*1024*1024)
var (
okCount int
failCount int
)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var rec preauthSpoolRecord
if err := json.Unmarshal(line, &rec); err != nil {
// malformed line: keep it so we don't lose evidence
_, _ = out.Write(append(line, '\n'))
failCount++
continue
}
// attempt insert
if err := db.InsertPreauth(ctx, dbConn, rec.Fields, rec.CheckoutDate); err != nil {
// DB still flaky or data issue: keep it for later retry
_, _ = out.Write(append(line, '\n'))
failCount++
continue
}
okCount++
}
if err := sc.Err(); err != nil {
log.WithError(err).Warn("drain spool: scanner error")
// best effort; do not replace spool
return
}
_ = out.Sync()
// Replace original spool with temp (atomic on Windows is best-effort; still OK here)
_ = in.Close()
_ = out.Close()
if err := os.Rename(tmp, spool); err != nil {
log.WithError(err).Warn("drain spool: rename failed")
return
}
if okCount > 0 || failCount > 0 {
log.WithFields(log.Fields{
"inserted": okCount,
"remaining": failCount,
}).Info("preauth spool drained")
}
}