// -------------------- // Queue-based client (single owner of port) // -------------------- package dispenser import ( "context" "fmt" "sync" "time" log "github.com/sirupsen/logrus" "github.com/tarm/serial" ) type cmdType int const ( cmdStatus cmdType = iota cmdToEncoder cmdOutOfMouth ) type cmdReq struct { typ cmdType ctx context.Context respCh chan cmdResp } type cmdResp struct { status []byte err error } type Client struct { port *serial.Port reqCh chan cmdReq done chan struct{} // status cache mu sync.RWMutex lastStatus []byte lastStatusT time.Time statusTTL time.Duration // published "stock/cardwell" cache + callback lastStockMu sync.RWMutex lastStock string onStock func(string) } // NewClient starts the worker that owns the serial port. func NewClient(port *serial.Port, queueSize int) *Client { if queueSize <= 0 { queueSize = 16 } c := &Client{ port: port, reqCh: make(chan cmdReq, queueSize), done: make(chan struct{}), statusTTL: defaultStatusTTL, } go c.loop() return c } func (c *Client) Close() { select { case <-c.done: return default: close(c.done) } } // Optional: tune cache TTL (how "fresh" cached status must be) func (c *Client) SetStatusTTL(d time.Duration) { c.mu.Lock() c.statusTTL = d c.mu.Unlock() } // OnStockUpdate registers a callback called whenever polling (or status reads) produce a stock status string. func (c *Client) OnStockUpdate(fn func(string)) { c.lastStockMu.Lock() c.onStock = fn c.lastStockMu.Unlock() } // LastStock returns the most recently computed stock/card-well status string. func (c *Client) LastStock() string { c.lastStockMu.RLock() defer c.lastStockMu.RUnlock() return c.lastStock } func (c *Client) setStock(statusBytes []byte) { stock := stockTake(statusBytes) c.lastStockMu.Lock() c.lastStock = stock fn := c.onStock c.lastStockMu.Unlock() // call outside lock if fn != nil { fn(stock) } } // StartPolling performs a periodic status refresh. // It will NOT interrupt commands: it enqueues only when queue is idle. func (c *Client) StartPolling(interval time.Duration) { if interval <= 0 { return } go func() { t := time.NewTicker(interval) defer t.Stop() for { select { case <-c.done: return case <-t.C: // enqueue only if idle to avoid delaying real commands if len(c.reqCh) != 0 { continue } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) _, err := c.CheckStatus(ctx) if err != nil { log.Debugf("dispenser polling: %v", err) } cancel() } } }() } func (c *Client) loop() { for { select { case <-c.done: return case req := <-c.reqCh: c.handle(req) } } } func (c *Client) handle(req cmdReq) { select { case <-req.ctx.Done(): req.respCh <- cmdResp{err: req.ctx.Err()} return default: } switch req.typ { case cmdStatus: st, err := checkDispenserStatus(c.port) if err == nil && len(st) == 4 { c.mu.Lock() c.lastStatus = append([]byte(nil), st...) c.lastStatusT = time.Now() c.mu.Unlock() // publish stock/cardwell c.setStock(st) } req.respCh <- cmdResp{status: st, err: err} case cmdToEncoder: err := cardToEncoderPosition(c.port) req.respCh <- cmdResp{err: err} case cmdOutOfMouth: err := cardOutOfMouth(c.port) req.respCh <- cmdResp{err: err} default: req.respCh <- cmdResp{err: fmt.Errorf("unknown command")} } } func (c *Client) do(ctx context.Context, typ cmdType) ([]byte, error) { rch := make(chan cmdResp, 1) req := cmdReq{typ: typ, ctx: ctx, respCh: rch} select { case c.reqCh <- req: case <-ctx.Done(): return nil, ctx.Err() } select { case r := <-rch: return r.status, r.err case <-ctx.Done(): return nil, ctx.Err() } } // CheckStatus returns cached status if fresh, otherwise enqueues a device status read. func (c *Client) CheckStatus(ctx context.Context) ([]byte, error) { c.mu.RLock() ttl := c.statusTTL st := append([]byte(nil), c.lastStatus...) ts := c.lastStatusT c.mu.RUnlock() if len(st) == 4 && time.Since(ts) <= ttl { // even when returning cached, keep stock in sync c.setStock(st) return st, nil } return c.do(ctx, cmdStatus) } func (c *Client) ToEncoder(ctx context.Context) error { _, err := c.do(ctx, cmdToEncoder) return err } func (c *Client) OutOfMouth(ctx context.Context) error { _, err := c.do(ctx, cmdOutOfMouth) return err } // -------------------- // Public sequences updated to use Client (queue) // -------------------- // DispenserPrepare: check status; if empty => ok; else ensure at encoder. func (c *Client) DispenserPrepare(ctx context.Context) (string, error) { const funcName = "DispenserPrepare" stockStatus := "" status, err := c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) if isCardWellEmpty(status) { return stockStatus, nil } if isAtEncoderPosition(status) { return stockStatus, nil } if err := c.ToEncoder(ctx); err != nil { return stockStatus, fmt.Errorf("[%s] to encoder: %w", funcName, err) } time.Sleep(delay) status, err = c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] re-check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) return stockStatus, nil } func (c *Client) DispenserStart(ctx context.Context) (string, error) { const funcName = "DispenserStart" stockStatus := "" status, err := c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) if isCardWellEmpty(status) { return stockStatus, fmt.Errorf(stockStatus) } if isAtEncoderPosition(status) { return stockStatus, nil } if err := c.ToEncoder(ctx); err != nil { return stockStatus, fmt.Errorf("[%s] to encoder: %w", funcName, err) } time.Sleep(delay) status, err = c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] re-check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) return stockStatus, nil } func (c *Client) DispenserFinal(ctx context.Context) (string, error) { const funcName = "DispenserFinal" stockStatus := "" if err := c.OutOfMouth(ctx); err != nil { return stockStatus, fmt.Errorf("[%s] out of mouth: %w", funcName, err) } time.Sleep(delay) status, err := c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) time.Sleep(delay) if err := c.ToEncoder(ctx); err != nil { return stockStatus, fmt.Errorf("[%s] to encoder: %w", funcName, err) } time.Sleep(delay) status, err = c.CheckStatus(ctx) if err != nil { return stockStatus, fmt.Errorf("[%s] re-check status: %w", funcName, err) } logStatus(status) stockStatus = stockTake(status) c.setStock(status) return stockStatus, nil }