diff --git a/go/node/client/v1beta3/tx.go b/go/node/client/v1beta3/tx.go index 3eebdb90..c13efe79 100644 --- a/go/node/client/v1beta3/tx.go +++ b/go/node/client/v1beta3/tx.go @@ -91,6 +91,7 @@ type BroadcastOptions struct { skipConfirm *bool confirmFn ConfirmFn broadcastMode *string + priority bool } // BroadcastOption is a function that takes as first argument a pointer to BroadcastOptions and returns an error @@ -204,6 +205,16 @@ func WithGenerateOnly(val bool) BroadcastOption { } } +// WithPriority marks the broadcast as priority. Priority requests are drained +// from a dedicated FIFO queue ahead of the default queue, so time-critical +// messages are not held up by long runs of non-priority work. +func WithPriority() BroadcastOption { + return func(opts *BroadcastOptions) error { + opts.priority = true + return nil + } +} + type broadcastResp struct { resp interface{} err error @@ -446,16 +457,30 @@ func (c *serialBroadcaster) BroadcastTx(ctx context.Context, tx sdk.Tx, opts ... func (c *serialBroadcaster) run() { defer c.lc.ShutdownCompleted() + priority := deque.NewDeque[broadcastReq]() pending := deque.NewDeque[broadcastReq]() + + // broadcastCh acts as a busy gate: non-nil = idle (tryBroadcast may dispatch), + // nil = in-flight (send case disabled, requests accumulate in the deques). + // Flipped to nil on dispatch and back to c.broadcastch on broadcastDoneCh. broadcastCh := c.broadcastch broadcastDoneCh := make(chan error, 1) tryBroadcast := func() { - if pending.Len() == 0 { + var q *deque.Deque[broadcastReq] + var qName string + switch { + case priority.Len() > 0: + q = priority + qName = "priority" + case pending.Len() > 0: + q = pending + qName = "pending" + default: return } - req := pending.Peek(0) + req := q.Peek(0) select { case broadcastCh <- broadcast{ @@ -466,7 +491,8 @@ func (c *serialBroadcaster) run() { opts: req.opts, }: broadcastCh = nil - _ = pending.PopFront() + _ = q.PopFront() + c.log.Debug("broadcaster: dispatch", "queue", qName, "priority", priority.Len(), "pending", pending.Len()) default: } } @@ -478,11 +504,18 @@ loop: c.lc.ShutdownInitiated(err) break loop case req := <-c.reqch: - pending.PushBack(req) + if req.opts != nil && req.opts.priority { + priority.PushBack(req) + c.log.Debug("broadcaster: enqueue", "queue", "priority", "priority", priority.Len(), "pending", pending.Len(), "busy", broadcastCh == nil) + } else { + pending.PushBack(req) + c.log.Debug("broadcaster: enqueue", "queue", "pending", "priority", priority.Len(), "pending", pending.Len(), "busy", broadcastCh == nil) + } tryBroadcast() case err := <-broadcastDoneCh: broadcastCh = c.broadcastch + c.log.Debug("broadcaster: done", "err", err, "priority", priority.Len(), "pending", pending.Len()) if err != nil { c.log.Error("unable to broadcast messages", "error", err) diff --git a/go/node/client/v1beta3/tx_test.go b/go/node/client/v1beta3/tx_test.go new file mode 100644 index 00000000..37eadf59 --- /dev/null +++ b/go/node/client/v1beta3/tx_test.go @@ -0,0 +1,224 @@ +package v1beta3 + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/boz/go-lifecycle" + "github.com/stretchr/testify/require" + + "cosmossdk.io/log" +) + +// schedulerHarness wires a bare serialBroadcaster and drives its run() loop. +// Tests enqueue broadcastReq values on reqch, observe dispatch order via +// broadcastch, and complete each in-flight broadcast by sending on donech of +// the dispatched struct. +type schedulerHarness struct { + t *testing.T + c *serialBroadcaster + cancel context.CancelFunc + done chan struct{} +} + +func newSchedulerHarness(t *testing.T) *schedulerHarness { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + h := &schedulerHarness{ + t: t, + c: &serialBroadcaster{ + reqch: make(chan broadcastReq, 16), + broadcastch: make(chan broadcast, 1), + lc: lifecycle.New(), + log: log.NewNopLogger(), + }, + cancel: cancel, + done: make(chan struct{}), + } + + go h.c.lc.WatchContext(ctx) + go func() { + h.c.run() + close(h.done) + }() + + t.Cleanup(h.shutdown) + + return h +} + +func (h *schedulerHarness) shutdown() { + h.t.Helper() + h.cancel() + select { + case <-h.done: + case <-time.After(time.Second): + h.t.Fatal("scheduler did not shut down") + } +} + +func (h *schedulerHarness) enqueue(tag string, priority bool) { + h.t.Helper() + req := broadcastReq{ctx: context.Background(), data: tag} + if priority { + req.opts = &BroadcastOptions{priority: true} + } + select { + case h.c.reqch <- req: + case <-time.After(time.Second): + h.t.Fatalf("timed out enqueueing %q", tag) + } +} + +func (h *schedulerHarness) enqueueNilOpts(tag string) { + h.t.Helper() + select { + case h.c.reqch <- broadcastReq{ctx: context.Background(), data: tag}: + case <-time.After(time.Second): + h.t.Fatalf("timed out enqueueing %q", tag) + } +} + +// recvDispatch reads one dispatch and returns its data tag. Caller must +// complete() the returned broadcast before expecting another dispatch. +func (h *schedulerHarness) recvDispatch() (string, broadcast) { + h.t.Helper() + select { + case b := <-h.c.broadcastch: + tag, _ := b.data.(string) + return tag, b + case <-time.After(time.Second): + h.t.Fatal("timed out waiting for dispatch") + return "", broadcast{} + } +} + +func (h *schedulerHarness) expectNoDispatch(d time.Duration) { + h.t.Helper() + select { + case b := <-h.c.broadcastch: + h.t.Fatalf("unexpected dispatch while gated: %v", b.data) + case <-time.After(d): + } +} + +func (h *schedulerHarness) complete(b broadcast, err error) { + h.t.Helper() + select { + case b.donech <- err: + case <-time.After(time.Second): + h.t.Fatal("timed out signalling broadcast done") + } +} + +func TestSerialBroadcaster_DefaultFIFOPreserved(t *testing.T) { + h := newSchedulerHarness(t) + + for _, tag := range []string{"a", "b", "c"} { + h.enqueue(tag, false) + } + + for _, want := range []string{"a", "b", "c"} { + got, b := h.recvDispatch() + require.Equal(t, want, got) + h.complete(b, nil) + } +} + +func TestSerialBroadcaster_PriorityPreemptsPending(t *testing.T) { + h := newSchedulerHarness(t) + + h.enqueue("n1", false) + _, inFlight := h.recvDispatch() + + h.enqueue("n2", false) + h.enqueue("n3", false) + h.enqueue("p1", true) + + h.expectNoDispatch(50 * time.Millisecond) + + h.complete(inFlight, nil) + + for _, want := range []string{"p1", "n2", "n3"} { + got, b := h.recvDispatch() + require.Equal(t, want, got) + h.complete(b, nil) + } +} + +func TestSerialBroadcaster_PriorityFIFOAmongItself(t *testing.T) { + h := newSchedulerHarness(t) + + h.enqueue("n1", false) + _, inFlight := h.recvDispatch() + + h.enqueue("p1", true) + h.enqueue("p2", true) + h.enqueue("p3", true) + + h.complete(inFlight, nil) + + for _, want := range []string{"p1", "p2", "p3"} { + got, b := h.recvDispatch() + require.Equal(t, want, got) + h.complete(b, nil) + } +} + +// Arrival [N1, P1, N2, P2, N3] with N1 held in-flight dispatches as +// [N1, P1, P2, N2, N3]: priority queue drains before pending. +func TestSerialBroadcaster_MixedInterleaving(t *testing.T) { + h := newSchedulerHarness(t) + + h.enqueue("n1", false) + _, inFlight := h.recvDispatch() + + h.enqueue("p1", true) + h.enqueue("n2", false) + h.enqueue("p2", true) + h.enqueue("n3", false) + + h.complete(inFlight, nil) + + for _, want := range []string{"p1", "p2", "n2", "n3"} { + got, b := h.recvDispatch() + require.Equal(t, want, got) + h.complete(b, nil) + } +} + +func TestSerialBroadcaster_BroadcastErrorDoesNotStall(t *testing.T) { + h := newSchedulerHarness(t) + + h.enqueue("a", false) + h.enqueue("b", false) + + _, first := h.recvDispatch() + h.complete(first, errors.New("boom")) + + got, second := h.recvDispatch() + require.Equal(t, "b", got) + h.complete(second, nil) +} + +func TestSerialBroadcaster_NilOptsTreatedAsNonPriority(t *testing.T) { + h := newSchedulerHarness(t) + + h.enqueueNilOpts("n1") + _, inFlight := h.recvDispatch() + + h.enqueue("p1", true) + h.enqueueNilOpts("n2") + + h.complete(inFlight, nil) + + for _, want := range []string{"p1", "n2"} { + got, b := h.recvDispatch() + require.Equal(t, want, got) + h.complete(b, nil) + } +}