diff --git a/backend/daemon/daemon.go b/backend/daemon/daemon.go
index e3b95f5a3..72f973e59 100644
--- a/backend/daemon/daemon.go
+++ b/backend/daemon/daemon.go
@@ -227,7 +227,7 @@ func Load(ctx context.Context, cfg config.Config, r *storage.Store, oo ...Option
}
a.HTTPServer, a.HTTPListener, err = initHTTP(cfg.Base, cfg.HTTP.Port, a.GRPCServer, &a.clean, a.g, a.Index,
- fm, a.Net, a.RPC.Daemon, opts.extraHTTPHandlers...)
+ fm, a.Net, a.Syncing.SchedulerSnapshot, a.RPC.Daemon, opts.extraHTTPHandlers...)
if err != nil {
return nil, err
}
diff --git a/backend/daemon/http.go b/backend/daemon/http.go
index 5a820dbe2..eb6f96e69 100644
--- a/backend/daemon/http.go
+++ b/backend/daemon/http.go
@@ -13,6 +13,7 @@ import (
"seed/backend/blob"
"seed/backend/config"
"seed/backend/hmnet"
+ "seed/backend/hmnet/syncing"
"seed/backend/logging"
"seed/backend/util/cleanup"
"seed/backend/util/trcstats"
@@ -138,6 +139,7 @@ func initHTTP(
blobs blockstore.Blockstore,
ipfsHandler *hmnet.FileManager,
p2pnet *hmnet.Node,
+ schedulerSnap func() syncing.SchedulerSnapshot,
daemonServer *daemonapi.Server,
extraHandlers ...func(*Router),
) (srv *http.Server, lis net.Listener, err error) {
@@ -178,7 +180,7 @@ func initHTTP(
router.Handle("/debug/requests", http.DefaultServeMux, RouteNav)
router.Handle("/debug/events", http.DefaultServeMux, RouteNav)
router.Handle("/debug/p2p", p2pnet.DebugHandler(), RouteNav)
- router.Handle("/debug/network", p2pnet.NetworkDebugHandler(), RouteNav)
+ router.Handle("/debug/network", p2pnet.NetworkDebugHandler(schedulerSnap), RouteNav)
router.Handle("/hm/api/config", p2pnet.HMAPIConfigHandler(), RouteNav)
router.Handle(vaultConnectionHandoffPath, corsMiddleware(http.HandlerFunc(daemonServer.HandleVaultHandoff)), 0)
diff --git a/backend/hmnet/http_debug_network.go b/backend/hmnet/http_debug_network.go
index 5ce841382..e7706d366 100644
--- a/backend/hmnet/http_debug_network.go
+++ b/backend/hmnet/http_debug_network.go
@@ -9,6 +9,7 @@ import (
"sort"
"time"
+ "seed/backend/hmnet/syncing"
"seed/backend/util/bwcounter"
"github.com/libp2p/go-libp2p/core/network"
@@ -21,11 +22,16 @@ import (
// sync/discovery latency, peer-table state, and reachability — modeled on
// /debug/traces. Latency cells are color-coded against fixed thresholds
// (p10>50ms, p50>100ms, p90>1s, p99>5s warn).
-func (n *Node) NetworkDebugHandler() http.Handler {
+//
+// schedulerSnap is an optional callback returning the syncing scheduler's
+// counters. Pass nil if no syncing service is wired up; the page omits the
+// scheduler section in that case.
+func (n *Node) NetworkDebugHandler(schedulerSnap func() syncing.SchedulerSnapshot) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
page := n.buildPage(r.Context())
+ page.Scheduler = buildScheduler(schedulerSnap)
if err := pageTpl.Execute(w, page); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -44,6 +50,21 @@ type networkPage struct {
Sections []section
Bandwidth bandwidthSection
Reachability reachSection
+ Scheduler *schedulerDebugSection
+}
+
+// schedulerDebugSection renders the syncing scheduler's instantaneous
+// queue/in-progress sizes plus cumulative preemption counters, so a human
+// looking at /debug/network can confirm whether the preemption machinery
+// is firing at all in production.
+type schedulerDebugSection struct {
+ TasksTotal string
+ QueueLen string
+ InProgress string
+ InProgressHot string
+ InProgressSubscription string
+ PreemptHotCount string
+ PreemptSubsCount string
}
// bandwidthSection holds all bandwidth-related tables for the page. Each table
@@ -374,6 +395,24 @@ func (n *Node) buildPage(ctx context.Context) networkPage {
return page
}
+// buildScheduler renders a syncing-scheduler snapshot block, or nil if no
+// snapshot func was provided.
+func buildScheduler(snap func() syncing.SchedulerSnapshot) *schedulerDebugSection {
+ if snap == nil {
+ return nil
+ }
+ s := snap()
+ return &schedulerDebugSection{
+ TasksTotal: fmt.Sprintf("%d", s.TasksTotal),
+ QueueLen: fmt.Sprintf("%d", s.QueueLen),
+ InProgress: fmt.Sprintf("%d", s.InProgress),
+ InProgressHot: fmt.Sprintf("%d", s.InProgressHot),
+ InProgressSubscription: fmt.Sprintf("%d", s.InProgressSubscription),
+ PreemptHotCount: fmt.Sprintf("%d", s.PreemptHotCount),
+ PreemptSubsCount: fmt.Sprintf("%d", s.PreemptSubsCount),
+ }
+}
+
// buildBandwidth assembles the bandwidth section from the libp2p metrics and
// the two HTTP counters owned by the Node. Numbers are pre-formatted into
// human-readable strings so the template stays simple.
@@ -1557,6 +1596,26 @@ details.howto>summary{color:#0a58ca;font-weight:600}
{{.Bandwidth.Help}}
+{{if .Scheduler}}
+{{with .Scheduler}}
+
Discovery scheduler
+queue/in-progress sizes are instantaneous; preempt counts are cumulative since startup
+
+| tasks tracked | {{.TasksTotal}} |
+| queued | {{.QueueLen}} |
+| in progress (total) | {{.InProgress}} |
+| hot (ephemeral) | {{.InProgressHot}} |
+| subscription | {{.InProgressSubscription}} |
+| preempt total — subscription cancelled for hot | {{.PreemptSubsCount}} |
+| preempt total — older hot cancelled for newer hot | {{.PreemptHotCount}} |
+
+
+What does this mean?
+Lifetime of a discovery task: queued → in progress (subscription or hot/ephemeral) → done. The two preempt counters track the only paths that cancel an in-flight task to make room for a higher-priority one. If both stay at zero across a busy session, the preemption machinery isn't actually load-bearing and could be removed.
+
+{{end}}
+{{end}}
+
Reachability snapshot
peerstore peers, connected first; showing top {{len .Reachability.Rows}} of {{.Reachability.Total}}
diff --git a/backend/hmnet/http_debug_network_test.go b/backend/hmnet/http_debug_network_test.go
index da2462482..b71b73822 100644
--- a/backend/hmnet/http_debug_network_test.go
+++ b/backend/hmnet/http_debug_network_test.go
@@ -2,8 +2,11 @@ package hmnet
import (
"math"
+ "strings"
"testing"
+ "seed/backend/hmnet/syncing"
+
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)
@@ -63,6 +66,60 @@ func TestHistStatsPercentileFallsBackForInfBucket(t *testing.T) {
require.Equal(t, 2.0, h.percentile(0.99))
}
+func TestBuildScheduler_NilSnap(t *testing.T) {
+ // A nil snapshot func must produce no scheduler section.
+ require.Nil(t, buildScheduler(nil))
+}
+
+func TestBuildScheduler_WithSnap(t *testing.T) {
+ snap := func() syncing.SchedulerSnapshot {
+ return syncing.SchedulerSnapshot{
+ TasksTotal: 5,
+ QueueLen: 2,
+ InProgress: 3,
+ InProgressHot: 2,
+ InProgressSubscription: 1,
+ PreemptHotCount: 7,
+ PreemptSubsCount: 11,
+ }
+ }
+
+ got := buildScheduler(snap)
+ require.NotNil(t, got)
+ require.Equal(t, "5", got.TasksTotal)
+ require.Equal(t, "2", got.QueueLen)
+ require.Equal(t, "3", got.InProgress)
+ require.Equal(t, "2", got.InProgressHot)
+ require.Equal(t, "1", got.InProgressSubscription)
+ require.Equal(t, "7", got.PreemptHotCount)
+ require.Equal(t, "11", got.PreemptSubsCount)
+}
+
+// TestPageTpl_RendersSchedulerSection runs the actual page template with
+// a populated Scheduler section to make sure the new block renders
+// without template errors and contains the snapshot's numeric values.
+func TestPageTpl_RendersSchedulerSection(t *testing.T) {
+ page := networkPage{
+ PeerID: "peer-1",
+ ProtocolID: "/test/0.0.1",
+ Scheduler: &schedulerDebugSection{
+ TasksTotal: "42",
+ QueueLen: "5",
+ InProgress: "3",
+ InProgressHot: "2",
+ InProgressSubscription: "1",
+ PreemptHotCount: "9",
+ PreemptSubsCount: "13",
+ },
+ }
+ var sb strings.Builder
+ require.NoError(t, pageTpl.Execute(&sb, page))
+ html := sb.String()
+ require.Contains(t, html, "Discovery scheduler")
+ require.Contains(t, html, "42")
+ require.Contains(t, html, "preempt total — older hot cancelled for newer hot")
+}
+
func TestHistStatsPercentileSkipsEmptyEarlyBuckets(t *testing.T) {
// Buckets 0 and 1 are empty; bucket 2 ((10ms, 100ms]) holds all
// observations. p=0.5 → target=5. prevUpper after bucket 1 is 0.01.
diff --git a/backend/hmnet/syncing/discovery.go b/backend/hmnet/syncing/discovery.go
index 4cd109848..6e15f522a 100644
--- a/backend/hmnet/syncing/discovery.go
+++ b/backend/hmnet/syncing/discovery.go
@@ -20,7 +20,6 @@ import (
"time"
"github.com/ipfs/go-cid"
- "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multicodec"
@@ -115,34 +114,49 @@ func (s *Service) DiscoverObjectWithProgress(ctx context.Context, entityID blob.
// state. We use them as a fallback before the DHT when no peer is connected,
// since syncWithPeer will dial on demand.
const maxKnownPeerFallback = 20
- var knownPeers []peer.ID
- if err = s.db.WithSave(ctxLocalPeers, func(conn *sqlite.Conn) error {
- return sqlitex.Exec(conn, qListPeersWithPid(), func(stmt *sqlite.Stmt) error {
- addresStr := stmt.ColumnText(0)
- pid := stmt.ColumnText(1)
- addrList := strings.Split(addresStr, ",")
- info, err := netutil.AddrInfoFromStrings(addrList...)
- if err != nil {
- s.log.Warn("Can't discover from peer since it has malformed addresses", zap.String("PID", pid), zap.Error(err))
- return nil
- }
- if s.host.Network().Connectedness(info.ID) == network.Connected {
- allPeers = append(allPeers, info.ID)
- } else if len(knownPeers) < maxKnownPeerFallback {
- s.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
- knownPeers = append(knownPeers, info.ID)
- }
- return nil
- })
- }); err != nil {
- MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
- return "", err
+ // Fast path: ask the host directly for the currently-connected peers
+ // instead of scanning the entire peers table and parsing every
+ // multiaddr just to filter on Connectedness. The host already maintains
+ // this set in O(connected) memory; it's the same predicate the original
+ // loop computed via Connectedness == Connected.
+ self := s.host.ID()
+ for _, pid := range s.host.Network().Peers() {
+ if pid == self {
+ continue
+ }
+ allPeers = append(allPeers, pid)
}
- MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
- if len(allPeers) == 0 && len(knownPeers) > 0 {
- allPeers = knownPeers
+
+ // Slow path: only when no peers are currently connected do we hit the
+ // DB to find a small fallback set we can dial. The query is bounded
+ // (LIMIT) so the multiaddr parse cost is bounded too — the prior
+ // unbounded scan was the source of the per-call CPU regression.
+ if len(allPeers) == 0 {
+ if err = s.db.WithSave(ctxLocalPeers, func(conn *sqlite.Conn) error {
+ return sqlitex.Exec(conn, qListPeersWithPidLimit(), func(stmt *sqlite.Stmt) error {
+ addresStr := stmt.ColumnText(0)
+ pid := stmt.ColumnText(1)
+ info, err := netutil.AddrInfoFromStrings(strings.Split(addresStr, ",")...)
+ if err != nil {
+ s.log.Warn("Can't discover from peer since it has malformed addresses", zap.String("PID", pid), zap.Error(err))
+ return nil
+ }
+ // peerstore.RecentlyConnectedAddrTTL is the right TTL for
+ // "we last saw this peer at this address" semantics — the
+ // dial path may run many seconds later as syncs queue up,
+ // and TempAddrTTL (the previous choice) is short enough
+ // that addresses can expire before we get to dial.
+ s.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.RecentlyConnectedAddrTTL)
+ allPeers = append(allPeers, info.ID)
+ return nil
+ }, maxKnownPeerFallback)
+ }); err != nil {
+ MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
+ return "", err
+ }
}
+ MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
// Create RBSR store once for reuse across all peers.
dkeys := colx.HashSet[DiscoveryKey]{
@@ -860,3 +874,17 @@ var qListPeersWithPid = dqb.Str(`
pid
FROM peers;
`)
+
+// qListPeersWithPidLimit is the bounded variant used by the disconnected-peer
+// fallback in DiscoverObjectWithProgress. We only need a small set to seed
+// dialing — most-recently-updated peers first — and previously the unbounded
+// scan was reading every row out of the table even though only the first 20
+// were kept. Caps the SQLite work at the size of the fallback budget.
+var qListPeersWithPidLimit = dqb.Str(`
+ SELECT
+ addresses,
+ pid
+ FROM peers
+ ORDER BY updated_at DESC
+ LIMIT ?;
+`)
diff --git a/backend/hmnet/syncing/scheduler.go b/backend/hmnet/syncing/scheduler.go
index a57ef6898..f973c6dac 100644
--- a/backend/hmnet/syncing/scheduler.go
+++ b/backend/hmnet/syncing/scheduler.go
@@ -12,6 +12,7 @@ import (
"seed/backend/util/maybe"
"strings"
"sync"
+ "sync/atomic"
"time"
"golang.org/x/sync/errgroup"
@@ -60,6 +61,11 @@ type taskHandle struct {
hotDeadline time.Time
runCount uint64 // Number of times task has been executed.
+ // hotIndex tracks the position of this task inside scheduler.inProgressHot
+ // while it is running as an ephemeral hot task. Maintained by the heap's
+ // OnIndexChange callback. Unset for queued, completed, or subscription tasks.
+ hotIndex maybe.Value[int]
+
// Cancellation. Set by dispatchReadyTasks under s.mu before dispatching, cleared in executeTask's unwind.
cancelFunc context.CancelFunc
runCtx context.Context
@@ -110,11 +116,28 @@ type scheduler struct {
tasks map[DiscoveryKey]*taskHandle
queue *heap.Heap[*taskHandle]
+ // inProgressHot indexes ephemeral hot tasks currently running, ordered
+ // by hotDeadline ascending so the head is always the oldest (the most
+ // likely preemption victim and the soonest heartbeat to expire). Lets
+ // preempt/cleanup/wake decisions stay O(1)/O(log n) instead of walking
+ // the full s.tasks map every dispatch tick.
+ inProgressHot *heap.Heap[*taskHandle]
+
+ // inProgressSubs indexes subscription tasks currently running. Keyed by
+ // DiscoveryKey for O(1) "any one" picks during subscription preemption.
+ inProgressSubs map[DiscoveryKey]*taskHandle
+
// inProgress counts tasks that are currently being executed by a worker
// goroutine. Bounded by cfg.MaxWorkers. Used to detect saturation for
// preemption decisions.
inProgress int
+ // Observability counters surfaced via /debug/network so we can confirm
+ // whether preemption ever fires in production. Atomics so reads from
+ // the debug page don't need to acquire s.mu.
+ preemptSubsCount atomic.Uint64
+ preemptHotCount atomic.Uint64
+
// Heartbeat TTL for hot tasks. Installed from defaultHotTTL in newScheduler.
// Tests may assign a smaller value before calling run().
hotTTL time.Duration
@@ -127,11 +150,12 @@ func newScheduler(disc discoverer, cfg config.Syncing) *scheduler {
}
s := &scheduler{
- disc: disc,
- cfg: cfg,
- timer: time.NewTimer(0),
- tasks: make(map[DiscoveryKey]*taskHandle),
- hotTTL: defaultHotTTL,
+ disc: disc,
+ cfg: cfg,
+ timer: time.NewTimer(0),
+ tasks: make(map[DiscoveryKey]*taskHandle),
+ inProgressSubs: make(map[DiscoveryKey]*taskHandle),
+ hotTTL: defaultHotTTL,
// Two-tier priority: hot tasks always before cold. Within hot tier,
// LIFO by hotDeadline desc (most recently touched wins); within cold
// tier, FIFO by nextRunTime asc (earliest due wins). nextRunTime is
@@ -148,6 +172,11 @@ func newScheduler(disc discoverer, cfg config.Syncing) *scheduler {
}
return a.nextRunTime.Before(b.nextRunTime)
}),
+ // Ascending by hotDeadline: head = oldest = next-to-expire = first
+ // preemption victim.
+ inProgressHot: heap.New(func(a, b *taskHandle) bool {
+ return a.hotDeadline.Before(b.hotDeadline)
+ }),
}
s.workers.SetLimit(cfg.MaxWorkers)
@@ -160,6 +189,13 @@ func newScheduler(disc discoverer, cfg config.Syncing) *scheduler {
task.queueIndex.Set(newIndex)
}
}
+ s.inProgressHot.OnIndexChange = func(task *taskHandle, newIndex int) {
+ if newIndex < 0 {
+ task.hotIndex.Clear()
+ } else {
+ task.hotIndex.Set(newIndex)
+ }
+ }
return s
}
@@ -183,8 +219,11 @@ func (s *scheduler) run(ctx context.Context) (err error) {
case <-s.timer.C:
s.mu.Lock()
nextWake := s.dispatchReadyTasks(ctx)
- s.timer.Reset(nextWake)
s.mu.Unlock()
+ // time.Timer.Reset is goroutine-safe, so we keep it out of the
+ // critical section to shrink the window during which scheduleTask
+ // callers contend on s.mu.
+ s.timer.Reset(nextWake)
}
}
}
@@ -192,12 +231,12 @@ func (s *scheduler) run(ctx context.Context) (err error) {
// executeTask runs discovery and updates task state directly. It runs in a
// goroutine spawned by the dispatch loop via errgroup.TryGo and owns a single
// MaxWorkers slot for its lifetime.
-func (s *scheduler) executeTask(task *taskHandle) {
- s.mu.Lock()
- taskCtx := task.runCtx
- prog := task.progress
- s.mu.Unlock()
-
+//
+// taskCtx and prog are passed by value so executeTask never has to grab s.mu
+// just to read them off task — the dispatch path captures them locally
+// before assigning the task fields, so the closure already holds the
+// correct values.
+func (s *scheduler) executeTask(taskCtx context.Context, task *taskHandle, prog *Progress) {
var blobTypes []string
if task.key.BlobTypes != "" {
blobTypes = strings.Split(task.key.BlobTypes, ",")
@@ -223,6 +262,7 @@ func (s *scheduler) executeTask(task *taskHandle) {
}
task.runCtx = nil
s.inProgress--
+ s.markFinished(task)
now := time.Now()
@@ -302,6 +342,11 @@ func (s *scheduler) scheduleTaskLocked(key DiscoveryKey, now time.Time, opts sch
if task.state == TaskStateInProgress {
// Re-touching a running task only updates the heartbeat deadline.
// Re-running decisions happen when the current run completes.
+ // If the task is in inProgressHot, its order key (hotDeadline)
+ // just changed, so fix the in-progress hot heap too.
+ if task.hotIndex.IsSet() {
+ s.inProgressHot.Fix(task.hotIndex.Value())
+ }
return task.Info()
}
@@ -328,6 +373,13 @@ func (s *scheduler) removeSubscriptions(keys ...DiscoveryKey) {
continue
}
+ // If the task is currently running as a subscription, drop the
+ // in-progress subscription tracking now that it isn't one anymore.
+ // We don't migrate it into inProgressHot — it'll finish normally
+ // and exit through markFinished, which is idempotent.
+ if existing, ok := s.inProgressSubs[key]; ok && existing == task {
+ delete(s.inProgressSubs, key)
+ }
task.subscription = false
now := time.Now()
@@ -381,23 +433,38 @@ func (s *scheduler) dispatchReadyTasks(ctx context.Context) (nextWake time.Durat
return s.boundedWake(wake, time.Now())
}
- // Task is ready. Pop it from the queue and prepare its run context.
+ // Task is ready. Pop it from the queue.
s.queue.Pop()
- task.state = TaskStateInProgress
- task.progress = &Progress{}
- taskCtx, cancel := context.WithCancel(ctx)
- task.runCtx = taskCtx
- task.cancelFunc = cancel
-
- // Dispatch via errgroup.TryGo. TryGo respects the MaxWorkers limit
- // set in newScheduler, so saturation is authoritative here — either
- // we get a goroutine slot or we don't.
- if s.inProgress < s.cfg.MaxWorkers && s.workers.TryGo(func() error {
- s.executeTask(task)
- return nil
- }) {
- s.inProgress++
- continue
+
+ // Capacity check first — only allocate the per-task context when
+ // we're committed to dispatching. Under saturation the prior code
+ // allocated a context.WithCancel + Progress per peeked task and
+ // immediately discarded them, which showed up directly in the
+ // allocation profile.
+ if s.inProgress < s.cfg.MaxWorkers {
+ taskCtx, cancel := context.WithCancel(ctx)
+ prog := &Progress{}
+ task.state = TaskStateInProgress
+ task.progress = prog
+ task.runCtx = taskCtx
+ task.cancelFunc = cancel
+ // TryGo respects the MaxWorkers limit; on the rare losing race
+ // we unwind state and re-enqueue.
+ if s.workers.TryGo(func() error {
+ s.executeTask(taskCtx, task, prog)
+ return nil
+ }) {
+ s.inProgress++
+ s.markInProgress(task)
+ continue
+ }
+ // Race lost: TryGo refused despite the capacity check. Reset
+ // state and fall through to the saturation branch.
+ cancel()
+ task.runCtx = nil
+ task.cancelFunc = nil
+ task.state = TaskStateIdle
+ task.progress = nil
}
// Worker pool is full. For hot tasks, try to make room by preempting
@@ -413,12 +480,6 @@ func (s *scheduler) dispatchReadyTasks(ctx context.Context) (nextWake time.Durat
}
}
- // Release the unused context and reset the task.
- cancel()
- task.runCtx = nil
- task.cancelFunc = nil
- task.state = TaskStateIdle
- task.progress = nil
s.enqueueLocked(task, now)
if preempted {
@@ -449,6 +510,34 @@ func (s *scheduler) dispatchReadyTasks(ctx context.Context) (nextWake time.Durat
return s.boundedWake(wake, now)
}
+// markInProgress adds a freshly-dispatched task to the appropriate
+// in-progress index. Caller must hold s.mu.
+func (s *scheduler) markInProgress(task *taskHandle) {
+ if task.subscription {
+ s.inProgressSubs[task.key] = task
+ return
+ }
+ if task.hotDeadline.IsZero() {
+ // Non-subscription, non-hot tasks aren't expected to land here —
+ // scheduleNext deletes them — but be defensive.
+ return
+ }
+ s.inProgressHot.Push(task)
+}
+
+// markFinished removes a completing or cancelling task from the in-progress
+// indexes. Idempotent: safe to call after preemption/cleanup paths have
+// already removed the task to keep it from being picked again.
+// Caller must hold s.mu.
+func (s *scheduler) markFinished(task *taskHandle) {
+ if existing, ok := s.inProgressSubs[task.key]; ok && existing == task {
+ delete(s.inProgressSubs, task.key)
+ }
+ if task.hotIndex.IsSet() {
+ s.inProgressHot.Remove(task.hotIndex.Value())
+ }
+}
+
// enqueueLocked inserts or reorders the task in the queue. Caller must hold
// s.mu. Must not be called while task.state == TaskStateInProgress.
func (s *scheduler) enqueueLocked(task *taskHandle, now time.Time) {
@@ -467,18 +556,17 @@ func (s *scheduler) enqueueLocked(task *taskHandle, now time.Time) {
// preemptSubscriptionLocked cancels an in-flight subscription so its worker
// slot can be reused by a higher-priority hot task. Returns true if a
// subscription was cancelled. Caller must hold s.mu.
+//
+// Pops the victim from inProgressSubs eagerly so a follow-up preemption in
+// the same dispatch sweep doesn't pick the same task.
func (s *scheduler) preemptSubscriptionLocked() bool {
- for _, t := range s.tasks {
- if t.state != TaskStateInProgress {
- continue
- }
- if !t.subscription {
- continue
- }
+ for key, t := range s.inProgressSubs {
if t.cancelFunc == nil {
continue
}
+ delete(s.inProgressSubs, key)
t.cancelFunc()
+ s.preemptSubsCount.Add(1)
return true
}
return false
@@ -490,84 +578,64 @@ func (s *scheduler) preemptSubscriptionLocked() bool {
// cancellation branch. Returns true if a task was preempted. Caller must
// hold s.mu.
func (s *scheduler) preemptOldestHotLocked(incoming *taskHandle, now time.Time) bool {
- var victim *taskHandle
- for _, t := range s.tasks {
- if t.state != TaskStateInProgress {
- continue
- }
- if t.subscription {
- continue
- }
- if !t.IsHot(now) {
- // Heartbeat already expired; cleanupExpiredHotTasksLocked will reap it.
- continue
- }
- if t.cancelFunc == nil {
- continue
- }
- if t.key == incoming.key {
- continue
- }
- // Victim must have a strictly older heartbeat than the incoming task;
- // otherwise preempting just delays the user's most recent request.
- if !t.hotDeadline.Before(incoming.hotDeadline) {
- continue
- }
- if victim == nil || t.hotDeadline.Before(victim.hotDeadline) {
- victim = t
- }
+ if s.inProgressHot.Len() == 0 {
+ return false
+ }
+ victim := s.inProgressHot.Peek()
+ if victim.key == incoming.key {
+ return false
}
- if victim == nil {
+ if !victim.IsHot(now) {
+ // Heartbeat already expired; cleanupExpiredHotTasksLocked will reap it.
return false
}
+ // Victim must have a strictly older heartbeat than the incoming task;
+ // otherwise preempting just delays the user's most recent request.
+ if !victim.hotDeadline.Before(incoming.hotDeadline) {
+ return false
+ }
+ if victim.cancelFunc == nil {
+ return false
+ }
+ s.inProgressHot.Remove(victim.hotIndex.Value())
victim.cancelFunc()
+ s.preemptHotCount.Add(1)
return true
}
// cleanupExpiredHotTasksLocked cancels in-progress ephemeral hot tasks whose
// heartbeat deadline has expired (the frontend stopped polling, meaning the
-// user is no longer viewing that document) and drops any queued ephemeral
-// hot tasks in the same state. Caller must hold s.mu.
+// user is no longer viewing that document). Caller must hold s.mu.
+//
+// Idle-but-queued ephemeral hot tasks whose deadline expired are reaped
+// lazily by the dispatch-loop tier-migration check, so we don't sweep the
+// queue here.
func (s *scheduler) cleanupExpiredHotTasksLocked(now time.Time) {
- var toDelete []*taskHandle
- for _, t := range s.tasks {
- if t.subscription {
- continue
- }
- if t.IsHot(now) {
- continue
+ for s.inProgressHot.Len() > 0 {
+ head := s.inProgressHot.Peek()
+ if !head.hotDeadline.Before(now) {
+ return
}
- switch t.state {
- case TaskStateInProgress:
- if t.cancelFunc != nil {
- t.cancelFunc()
- }
- default:
- if t.queueIndex.IsSet() {
- toDelete = append(toDelete, t)
- }
+ s.inProgressHot.Remove(head.hotIndex.Value())
+ if head.cancelFunc != nil {
+ head.cancelFunc()
}
}
- for _, t := range toDelete {
- s.queue.Remove(t.queueIndex.Value())
- delete(s.tasks, t.key)
- }
}
// boundedWake narrows `candidate` by the earliest in-progress hot-task
// heartbeat expiry so cleanup runs promptly. Caller must hold s.mu.
func (s *scheduler) boundedWake(candidate time.Duration, now time.Time) time.Duration {
- for _, t := range s.tasks {
- if t.state != TaskStateInProgress || t.subscription {
- continue
- }
- if t.hotDeadline.IsZero() {
- continue
- }
- d := max(t.hotDeadline.Sub(now), 0)
- if d < candidate {
- candidate = d
- }
+ if s.inProgressHot.Len() == 0 {
+ return candidate
+ }
+ head := s.inProgressHot.Peek()
+ if head.hotDeadline.IsZero() {
+ return candidate
+ }
+ d := max(head.hotDeadline.Sub(now), 0)
+ if d < candidate {
+ return d
}
return candidate
}
@@ -613,3 +681,33 @@ func (s *scheduler) resetTimer(now time.Time) {
}
s.timer.Reset(s.boundedWake(wake, now))
}
+
+// SchedulerSnapshot is a point-in-time view of scheduler internals,
+// surfaced via /debug/network so we can confirm whether preemption ever
+// fires in production without standing up a metrics scraper. All fields
+// are cumulative since process start except Tasks/Queue/InProgress*, which
+// are instantaneous.
+type SchedulerSnapshot struct {
+ TasksTotal int
+ QueueLen int
+ InProgress int
+ InProgressHot int
+ InProgressSubscription int
+ PreemptHotCount uint64
+ PreemptSubsCount uint64
+}
+
+// snapshot returns a consistent view of the scheduler counters.
+func (s *scheduler) snapshot() SchedulerSnapshot {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return SchedulerSnapshot{
+ TasksTotal: len(s.tasks),
+ QueueLen: s.queue.Len(),
+ InProgress: s.inProgress,
+ InProgressHot: s.inProgressHot.Len(),
+ InProgressSubscription: len(s.inProgressSubs),
+ PreemptHotCount: s.preemptHotCount.Load(),
+ PreemptSubsCount: s.preemptSubsCount.Load(),
+ }
+}
diff --git a/backend/hmnet/syncing/syncing.go b/backend/hmnet/syncing/syncing.go
index 458263cd3..8893ce8a7 100644
--- a/backend/hmnet/syncing/syncing.go
+++ b/backend/hmnet/syncing/syncing.go
@@ -557,6 +557,14 @@ func (s *Service) listSubscriptionsFromDB(ctx context.Context) ([]Subscription,
return subs, nil
}
+// SchedulerSnapshot returns a point-in-time view of the scheduler's
+// internal counters and queue sizes. Safe to call from any goroutine.
+// Used by /debug/network to surface whether preemption is firing in
+// production.
+func (s *Service) SchedulerSnapshot() SchedulerSnapshot {
+ return s.scheduler.snapshot()
+}
+
// TouchHotTask returns an existing task or creates a new ephemeral one.
// If a subscription task already exists, it wakes it up and returns its info.
//