Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func Load(ctx context.Context, cfg config.Config, r *storage.Store, oo ...Option
}

a.HTTPServer, a.HTTPListener, err = initHTTP(cfg.Base, cfg.HTTP.Port, a.GRPCServer, &a.clean, a.g, a.Index,
fm, a.Net, a.RPC.Daemon, opts.extraHTTPHandlers...)
fm, a.Net, a.Syncing.SchedulerSnapshot, a.RPC.Daemon, opts.extraHTTPHandlers...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion backend/daemon/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"seed/backend/blob"
"seed/backend/config"
"seed/backend/hmnet"
"seed/backend/hmnet/syncing"
"seed/backend/logging"
"seed/backend/util/cleanup"
"seed/backend/util/trcstats"
Expand Down Expand Up @@ -138,6 +139,7 @@ func initHTTP(
blobs blockstore.Blockstore,
ipfsHandler *hmnet.FileManager,
p2pnet *hmnet.Node,
schedulerSnap func() syncing.SchedulerSnapshot,
daemonServer *daemonapi.Server,
extraHandlers ...func(*Router),
) (srv *http.Server, lis net.Listener, err error) {
Expand Down Expand Up @@ -178,7 +180,7 @@ func initHTTP(
router.Handle("/debug/requests", http.DefaultServeMux, RouteNav)
router.Handle("/debug/events", http.DefaultServeMux, RouteNav)
router.Handle("/debug/p2p", p2pnet.DebugHandler(), RouteNav)
router.Handle("/debug/network", p2pnet.NetworkDebugHandler(), RouteNav)
router.Handle("/debug/network", p2pnet.NetworkDebugHandler(schedulerSnap), RouteNav)

router.Handle("/hm/api/config", p2pnet.HMAPIConfigHandler(), RouteNav)
router.Handle(vaultConnectionHandoffPath, corsMiddleware(http.HandlerFunc(daemonServer.HandleVaultHandoff)), 0)
Expand Down
61 changes: 60 additions & 1 deletion backend/hmnet/http_debug_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"time"

"seed/backend/hmnet/syncing"
"seed/backend/util/bwcounter"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -21,11 +22,16 @@ import (
// sync/discovery latency, peer-table state, and reachability — modeled on
// /debug/traces. Latency cells are color-coded against fixed thresholds
// (p10>50ms, p50>100ms, p90>1s, p99>5s warn).
func (n *Node) NetworkDebugHandler() http.Handler {
//
// schedulerSnap is an optional callback returning the syncing scheduler's
// counters. Pass nil if no syncing service is wired up; the page omits the
// scheduler section in that case.
func (n *Node) NetworkDebugHandler(schedulerSnap func() syncing.SchedulerSnapshot) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")

page := n.buildPage(r.Context())
page.Scheduler = buildScheduler(schedulerSnap)

if err := pageTpl.Execute(w, page); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -44,6 +50,21 @@ type networkPage struct {
Sections []section
Bandwidth bandwidthSection
Reachability reachSection
Scheduler *schedulerDebugSection
}

// schedulerDebugSection renders the syncing scheduler's instantaneous
// queue/in-progress sizes plus cumulative preemption counters, so a human
// looking at /debug/network can confirm whether the preemption machinery
// is firing at all in production.
type schedulerDebugSection struct {
TasksTotal string
QueueLen string
InProgress string
InProgressHot string
InProgressSubscription string
PreemptHotCount string
PreemptSubsCount string
}

// bandwidthSection holds all bandwidth-related tables for the page. Each table
Expand Down Expand Up @@ -374,6 +395,24 @@ func (n *Node) buildPage(ctx context.Context) networkPage {
return page
}

// buildScheduler renders a syncing-scheduler snapshot block, or nil if no
// snapshot func was provided.
func buildScheduler(snap func() syncing.SchedulerSnapshot) *schedulerDebugSection {
if snap == nil {
return nil
}
s := snap()
return &schedulerDebugSection{
TasksTotal: fmt.Sprintf("%d", s.TasksTotal),
QueueLen: fmt.Sprintf("%d", s.QueueLen),
InProgress: fmt.Sprintf("%d", s.InProgress),
InProgressHot: fmt.Sprintf("%d", s.InProgressHot),
InProgressSubscription: fmt.Sprintf("%d", s.InProgressSubscription),
PreemptHotCount: fmt.Sprintf("%d", s.PreemptHotCount),
PreemptSubsCount: fmt.Sprintf("%d", s.PreemptSubsCount),
}
}

// buildBandwidth assembles the bandwidth section from the libp2p metrics and
// the two HTTP counters owned by the Node. Numbers are pre-formatted into
// human-readable strings so the template stays simple.
Expand Down Expand Up @@ -1557,6 +1596,26 @@ details.howto>summary{color:#0a58ca;font-weight:600}
{{.Bandwidth.Help}}
</details>

{{if .Scheduler}}
{{with .Scheduler}}
<h2>Discovery scheduler</h2>
<div class="subtitle">queue/in-progress sizes are instantaneous; preempt counts are cumulative since startup</div>
<table class="kv">
<tr><td>tasks tracked</td><td class="num">{{.TasksTotal}}</td></tr>
<tr><td>queued</td><td class="num">{{.QueueLen}}</td></tr>
<tr><td>in progress (total)</td><td class="num">{{.InProgress}}</td></tr>
<tr><td>&nbsp;&nbsp;hot (ephemeral)</td><td class="num">{{.InProgressHot}}</td></tr>
<tr><td>&nbsp;&nbsp;subscription</td><td class="num">{{.InProgressSubscription}}</td></tr>
<tr><td>preempt total — subscription cancelled for hot</td><td class="num">{{.PreemptSubsCount}}</td></tr>
<tr><td>preempt total — older hot cancelled for newer hot</td><td class="num">{{.PreemptHotCount}}</td></tr>
</table>
<details class="help">
<summary>What does this mean?</summary>
<p>Lifetime of a discovery task: <em>queued</em> → <em>in progress</em> (subscription or hot/ephemeral) → done. The two <em>preempt</em> counters track the only paths that cancel an in-flight task to make room for a higher-priority one. If both stay at zero across a busy session, the preemption machinery isn't actually load-bearing and could be removed.</p>
</details>
{{end}}
{{end}}

<h2>Reachability snapshot</h2>
<div class="subtitle">peerstore peers, connected first; showing top {{len .Reachability.Rows}} of {{.Reachability.Total}}</div>
<table class="reach">
Expand Down
57 changes: 57 additions & 0 deletions backend/hmnet/http_debug_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package hmnet

import (
"math"
"strings"
"testing"

"seed/backend/hmnet/syncing"

dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -63,6 +66,60 @@ func TestHistStatsPercentileFallsBackForInfBucket(t *testing.T) {
require.Equal(t, 2.0, h.percentile(0.99))
}

func TestBuildScheduler_NilSnap(t *testing.T) {
// A nil snapshot func must produce no scheduler section.
require.Nil(t, buildScheduler(nil))
}

func TestBuildScheduler_WithSnap(t *testing.T) {
snap := func() syncing.SchedulerSnapshot {
return syncing.SchedulerSnapshot{
TasksTotal: 5,
QueueLen: 2,
InProgress: 3,
InProgressHot: 2,
InProgressSubscription: 1,
PreemptHotCount: 7,
PreemptSubsCount: 11,
}
}

got := buildScheduler(snap)
require.NotNil(t, got)
require.Equal(t, "5", got.TasksTotal)
require.Equal(t, "2", got.QueueLen)
require.Equal(t, "3", got.InProgress)
require.Equal(t, "2", got.InProgressHot)
require.Equal(t, "1", got.InProgressSubscription)
require.Equal(t, "7", got.PreemptHotCount)
require.Equal(t, "11", got.PreemptSubsCount)
}

// TestPageTpl_RendersSchedulerSection runs the actual page template with
// a populated Scheduler section to make sure the new <h2> block renders
// without template errors and contains the snapshot's numeric values.
func TestPageTpl_RendersSchedulerSection(t *testing.T) {
page := networkPage{
PeerID: "peer-1",
ProtocolID: "/test/0.0.1",
Scheduler: &schedulerDebugSection{
TasksTotal: "42",
QueueLen: "5",
InProgress: "3",
InProgressHot: "2",
InProgressSubscription: "1",
PreemptHotCount: "9",
PreemptSubsCount: "13",
},
}
var sb strings.Builder
require.NoError(t, pageTpl.Execute(&sb, page))
html := sb.String()
require.Contains(t, html, "Discovery scheduler")
require.Contains(t, html, "42")
require.Contains(t, html, "preempt total — older hot cancelled for newer hot")
}

func TestHistStatsPercentileSkipsEmptyEarlyBuckets(t *testing.T) {
// Buckets 0 and 1 are empty; bucket 2 ((10ms, 100ms]) holds all
// observations. p=0.5 → target=5. prevUpper after bucket 1 is 0.01.
Expand Down
80 changes: 54 additions & 26 deletions backend/hmnet/syncing/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multicodec"
Expand Down Expand Up @@ -115,34 +114,49 @@ func (s *Service) DiscoverObjectWithProgress(ctx context.Context, entityID blob.
// state. We use them as a fallback before the DHT when no peer is connected,
// since syncWithPeer will dial on demand.
const maxKnownPeerFallback = 20
var knownPeers []peer.ID
if err = s.db.WithSave(ctxLocalPeers, func(conn *sqlite.Conn) error {
return sqlitex.Exec(conn, qListPeersWithPid(), func(stmt *sqlite.Stmt) error {
addresStr := stmt.ColumnText(0)
pid := stmt.ColumnText(1)
addrList := strings.Split(addresStr, ",")
info, err := netutil.AddrInfoFromStrings(addrList...)
if err != nil {
s.log.Warn("Can't discover from peer since it has malformed addresses", zap.String("PID", pid), zap.Error(err))
return nil
}
if s.host.Network().Connectedness(info.ID) == network.Connected {
allPeers = append(allPeers, info.ID)
} else if len(knownPeers) < maxKnownPeerFallback {
s.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
knownPeers = append(knownPeers, info.ID)
}

return nil
})
}); err != nil {
MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
return "", err
// Fast path: ask the host directly for the currently-connected peers
// instead of scanning the entire peers table and parsing every
// multiaddr just to filter on Connectedness. The host already maintains
// this set in O(connected) memory; it's the same predicate the original
// loop computed via Connectedness == Connected.
self := s.host.ID()
for _, pid := range s.host.Network().Peers() {
if pid == self {
continue
}
allPeers = append(allPeers, pid)
}
MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
if len(allPeers) == 0 && len(knownPeers) > 0 {
allPeers = knownPeers

// Slow path: only when no peers are currently connected do we hit the
// DB to find a small fallback set we can dial. The query is bounded
// (LIMIT) so the multiaddr parse cost is bounded too — the prior
// unbounded scan was the source of the per-call CPU regression.
if len(allPeers) == 0 {
if err = s.db.WithSave(ctxLocalPeers, func(conn *sqlite.Conn) error {
return sqlitex.Exec(conn, qListPeersWithPidLimit(), func(stmt *sqlite.Stmt) error {
addresStr := stmt.ColumnText(0)
pid := stmt.ColumnText(1)
info, err := netutil.AddrInfoFromStrings(strings.Split(addresStr, ",")...)
if err != nil {
s.log.Warn("Can't discover from peer since it has malformed addresses", zap.String("PID", pid), zap.Error(err))
return nil
}
// peerstore.RecentlyConnectedAddrTTL is the right TTL for
// "we last saw this peer at this address" semantics — the
// dial path may run many seconds later as syncs queue up,
// and TempAddrTTL (the previous choice) is short enough
// that addresses can expire before we get to dial.
s.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.RecentlyConnectedAddrTTL)
allPeers = append(allPeers, info.ID)
return nil
}, maxKnownPeerFallback)
}); err != nil {
MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())
return "", err
}
}
MDiscoverPhaseSeconds.WithLabelValues("peer_select").Observe(time.Since(peerSelectStart).Seconds())

// Create RBSR store once for reuse across all peers.
dkeys := colx.HashSet[DiscoveryKey]{
Expand Down Expand Up @@ -860,3 +874,17 @@ var qListPeersWithPid = dqb.Str(`
pid
FROM peers;
`)

// qListPeersWithPidLimit is the bounded variant used by the disconnected-peer
// fallback in DiscoverObjectWithProgress. We only need a small set to seed
// dialing — most-recently-updated peers first — and previously the unbounded
// scan was reading every row out of the table even though only the first 20
// were kept. Caps the SQLite work at the size of the fallback budget.
var qListPeersWithPidLimit = dqb.Str(`
SELECT
addresses,
pid
FROM peers
ORDER BY updated_at DESC
LIMIT ?;
`)
Loading
Loading