Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions go/node/client/v1beta3/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type BroadcastOptions struct {
skipConfirm *bool
confirmFn ConfirmFn
broadcastMode *string
priority bool
}

// BroadcastOption is a function that takes as first argument a pointer to BroadcastOptions and returns an error
Expand Down Expand Up @@ -204,6 +205,16 @@ func WithGenerateOnly(val bool) BroadcastOption {
}
}

// WithPriority marks the broadcast as priority. Priority requests are drained
// from a dedicated FIFO queue ahead of the default queue, so time-critical
// messages are not held up by long runs of non-priority work.
func WithPriority() BroadcastOption {
return func(opts *BroadcastOptions) error {
opts.priority = true
return nil
}
}

type broadcastResp struct {
resp interface{}
err error
Expand Down Expand Up @@ -446,16 +457,30 @@ func (c *serialBroadcaster) BroadcastTx(ctx context.Context, tx sdk.Tx, opts ...
func (c *serialBroadcaster) run() {
defer c.lc.ShutdownCompleted()

priority := deque.NewDeque[broadcastReq]()
pending := deque.NewDeque[broadcastReq]()

// broadcastCh acts as a busy gate: non-nil = idle (tryBroadcast may dispatch),
// nil = in-flight (send case disabled, requests accumulate in the deques).
// Flipped to nil on dispatch and back to c.broadcastch on broadcastDoneCh.
broadcastCh := c.broadcastch
broadcastDoneCh := make(chan error, 1)

tryBroadcast := func() {
if pending.Len() == 0 {
var q *deque.Deque[broadcastReq]
var qName string
switch {
case priority.Len() > 0:
q = priority
qName = "priority"
case pending.Len() > 0:
q = pending
qName = "pending"
default:
return
}

req := pending.Peek(0)
req := q.Peek(0)

select {
case broadcastCh <- broadcast{
Expand All @@ -466,7 +491,8 @@ func (c *serialBroadcaster) run() {
opts: req.opts,
}:
broadcastCh = nil
_ = pending.PopFront()
_ = q.PopFront()
c.log.Debug("broadcaster: dispatch", "queue", qName, "priority", priority.Len(), "pending", pending.Len())
default:
}
}
Expand All @@ -478,11 +504,18 @@ loop:
c.lc.ShutdownInitiated(err)
break loop
case req := <-c.reqch:
pending.PushBack(req)
if req.opts != nil && req.opts.priority {
priority.PushBack(req)
c.log.Debug("broadcaster: enqueue", "queue", "priority", "priority", priority.Len(), "pending", pending.Len(), "busy", broadcastCh == nil)
} else {
pending.PushBack(req)
c.log.Debug("broadcaster: enqueue", "queue", "pending", "priority", priority.Len(), "pending", pending.Len(), "busy", broadcastCh == nil)
}

tryBroadcast()
case err := <-broadcastDoneCh:
broadcastCh = c.broadcastch
c.log.Debug("broadcaster: done", "err", err, "priority", priority.Len(), "pending", pending.Len())

if err != nil {
c.log.Error("unable to broadcast messages", "error", err)
Expand Down
224 changes: 224 additions & 0 deletions go/node/client/v1beta3/tx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package v1beta3

import (
"context"
"errors"
"testing"
"time"

"github.com/boz/go-lifecycle"
"github.com/stretchr/testify/require"

"cosmossdk.io/log"
)

// schedulerHarness wires a bare serialBroadcaster and drives its run() loop.
// Tests enqueue broadcastReq values on reqch, observe dispatch order via
// broadcastch, and complete each in-flight broadcast by sending on donech of
// the dispatched struct.
type schedulerHarness struct {
t *testing.T
c *serialBroadcaster
cancel context.CancelFunc
done chan struct{}
}

func newSchedulerHarness(t *testing.T) *schedulerHarness {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())

h := &schedulerHarness{
t: t,
c: &serialBroadcaster{
reqch: make(chan broadcastReq, 16),
broadcastch: make(chan broadcast, 1),
lc: lifecycle.New(),
log: log.NewNopLogger(),
},
cancel: cancel,
done: make(chan struct{}),
}

go h.c.lc.WatchContext(ctx)
go func() {
h.c.run()
close(h.done)
}()

t.Cleanup(h.shutdown)

return h
}

func (h *schedulerHarness) shutdown() {
h.t.Helper()
h.cancel()
select {
case <-h.done:
case <-time.After(time.Second):
h.t.Fatal("scheduler did not shut down")
}
}

func (h *schedulerHarness) enqueue(tag string, priority bool) {
h.t.Helper()
req := broadcastReq{ctx: context.Background(), data: tag}
if priority {
req.opts = &BroadcastOptions{priority: true}
}
select {
case h.c.reqch <- req:
case <-time.After(time.Second):
h.t.Fatalf("timed out enqueueing %q", tag)
}
}

func (h *schedulerHarness) enqueueNilOpts(tag string) {
h.t.Helper()
select {
case h.c.reqch <- broadcastReq{ctx: context.Background(), data: tag}:
case <-time.After(time.Second):
h.t.Fatalf("timed out enqueueing %q", tag)
}
}

// recvDispatch reads one dispatch and returns its data tag. Caller must
// complete() the returned broadcast before expecting another dispatch.
func (h *schedulerHarness) recvDispatch() (string, broadcast) {
h.t.Helper()
select {
case b := <-h.c.broadcastch:
tag, _ := b.data.(string)
return tag, b
case <-time.After(time.Second):
h.t.Fatal("timed out waiting for dispatch")
return "", broadcast{}
}
}

func (h *schedulerHarness) expectNoDispatch(d time.Duration) {
h.t.Helper()
select {
case b := <-h.c.broadcastch:
h.t.Fatalf("unexpected dispatch while gated: %v", b.data)
case <-time.After(d):
}
}

func (h *schedulerHarness) complete(b broadcast, err error) {
h.t.Helper()
select {
case b.donech <- err:
case <-time.After(time.Second):
h.t.Fatal("timed out signalling broadcast done")
}
}

func TestSerialBroadcaster_DefaultFIFOPreserved(t *testing.T) {
h := newSchedulerHarness(t)

for _, tag := range []string{"a", "b", "c"} {
h.enqueue(tag, false)
}

for _, want := range []string{"a", "b", "c"} {
got, b := h.recvDispatch()
require.Equal(t, want, got)
h.complete(b, nil)
}
}

func TestSerialBroadcaster_PriorityPreemptsPending(t *testing.T) {
h := newSchedulerHarness(t)

h.enqueue("n1", false)
_, inFlight := h.recvDispatch()

h.enqueue("n2", false)
h.enqueue("n3", false)
h.enqueue("p1", true)

h.expectNoDispatch(50 * time.Millisecond)

h.complete(inFlight, nil)

for _, want := range []string{"p1", "n2", "n3"} {
got, b := h.recvDispatch()
require.Equal(t, want, got)
h.complete(b, nil)
}
}

func TestSerialBroadcaster_PriorityFIFOAmongItself(t *testing.T) {
h := newSchedulerHarness(t)

h.enqueue("n1", false)
_, inFlight := h.recvDispatch()

h.enqueue("p1", true)
h.enqueue("p2", true)
h.enqueue("p3", true)

h.complete(inFlight, nil)

for _, want := range []string{"p1", "p2", "p3"} {
got, b := h.recvDispatch()
require.Equal(t, want, got)
h.complete(b, nil)
}
}

// Arrival [N1, P1, N2, P2, N3] with N1 held in-flight dispatches as
// [N1, P1, P2, N2, N3]: priority queue drains before pending.
func TestSerialBroadcaster_MixedInterleaving(t *testing.T) {
h := newSchedulerHarness(t)

h.enqueue("n1", false)
_, inFlight := h.recvDispatch()

h.enqueue("p1", true)
h.enqueue("n2", false)
h.enqueue("p2", true)
h.enqueue("n3", false)

h.complete(inFlight, nil)

for _, want := range []string{"p1", "p2", "n2", "n3"} {
got, b := h.recvDispatch()
require.Equal(t, want, got)
h.complete(b, nil)
}
}

func TestSerialBroadcaster_BroadcastErrorDoesNotStall(t *testing.T) {
h := newSchedulerHarness(t)

h.enqueue("a", false)
h.enqueue("b", false)

_, first := h.recvDispatch()
h.complete(first, errors.New("boom"))

got, second := h.recvDispatch()
require.Equal(t, "b", got)
h.complete(second, nil)
}

func TestSerialBroadcaster_NilOptsTreatedAsNonPriority(t *testing.T) {
h := newSchedulerHarness(t)

h.enqueueNilOpts("n1")
_, inFlight := h.recvDispatch()

h.enqueue("p1", true)
h.enqueueNilOpts("n2")

h.complete(inFlight, nil)

for _, want := range []string{"p1", "n2"} {
got, b := h.recvDispatch()
require.Equal(t, want, got)
h.complete(b, nil)
}
}
Loading