Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
60bcf7d
added an implementation of aggregating memory store
dmitri-netbird Jun 9, 2026
243e934
initial support for aggregation of events
dmitri-netbird Jun 9, 2026
b654a75
added tcp-aggregation test
dmitri-netbird Jun 10, 2026
101ae3c
added manager integration test
dmitri-netbird Jun 10, 2026
8f99362
added tracking of the number of start-, drop, and end-events in an ag…
dmitri-netbird Jun 10, 2026
42e0007
fixes based on sonarcube checks
dmitri-netbird Jun 11, 2026
12a8943
regenerated proto files
dmitri-netbird Jun 11, 2026
a593e32
removed inadvertenly added google proto files
dmitri-netbird Jun 11, 2026
d9d585e
pacifying linter
dmitri-netbird Jun 11, 2026
598558c
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 11, 2026
98ce097
update test to validate event aggregation over tcp, udp, icmp, and ic…
dmitri-netbird Jun 11, 2026
b21f7f7
updated event aggregation test
dmitri-netbird Jun 12, 2026
e3f9396
regenerate protobufs with expected versions of protoc and protoc-gen-go
dmitri-netbird Jun 15, 2026
c875aa6
remove protoc/protoc-gen headers from flow_grpc.pb.go
dmitri-netbird Jun 15, 2026
fae5b7e
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 15, 2026
9dc5e77
updated openapi spec
dmitri-netbird Jun 15, 2026
07c527f
updated openapi NetworkTrafficEvent spec, regenerated types
dmitri-netbird Jun 15, 2026
a93cb66
respond to feedback
dmitri-netbird Jun 16, 2026
7295e2e
fixed an issue with how we track events that shouldn't be aggregated
dmitri-netbird Jun 16, 2026
67d1419
fixed mapping of events to protobuf
dmitri-netbird Jun 16, 2026
ca4ce0a
icmp code values in aggregated events do not matter
dmitri-netbird Jun 16, 2026
e89c0f5
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 16, 2026
0286c17
regenerate openapi types
dmitri-netbird Jun 16, 2026
3d6fc3b
added a comment re: unbounded unacked events
dmitri-netbird Jun 16, 2026
9ea463e
reset aggregated event type to unknown
dmitri-netbird Jun 16, 2026
1721a4f
fix event aggregation test
dmitri-netbird Jun 16, 2026
5dc159e
used the source port of the earliest event
dmitri-netbird Jun 16, 2026
0e95b6d
add tracking of window starts and ends
dmitri-netbird Jun 15, 2026
fd763ec
updated openapi spec
dmitri-netbird Jun 15, 2026
17cc13f
reverted changes to generate.sh
dmitri-netbird Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions client/internal/netflow/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ type Logger struct {
wgIfaceNetV6 netip.Prefix
dnsCollection atomic.Bool
exitNodeCollection atomic.Bool
Store types.Store
Store types.AggregatingStore
}

func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
return &Logger{
statusRecorder: statusRecorder,
wgIfaceNet: wgIfaceIPNet,
wgIfaceNetV6: wgIfaceIPNetV6,
Store: store.NewMemoryStore(),
Store: store.NewAggregatingMemoryStore(),
}
}

Expand Down Expand Up @@ -125,6 +125,10 @@ func (l *Logger) stop() {
l.mux.Unlock()
}

func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
return l.Store.ResetAggregationWindow()
}

func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents()
}
Expand Down
92 changes: 73 additions & 19 deletions client/internal/netflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/store"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/flow/client"
Expand All @@ -23,14 +25,16 @@ import (

// Manager handles netflow tracking and logging
type Manager struct {
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
publicKey []byte
cancel context.CancelFunc
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
eventsWithoutAcks nftypes.Store
publicKey []byte
cancel context.CancelFunc
retryInterval time.Duration
}

// NewManager creates a new netflow manager
Expand All @@ -48,9 +52,11 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
}

return &Manager{
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
retryInterval: time.Second,
eventsWithoutAcks: store.NewMemoryStore(),
}
}

Expand Down Expand Up @@ -107,7 +113,7 @@ func (m *Manager) resetClient() error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.shutdownWg.Add(2)
m.shutdownWg.Add(3)
go func() {
defer m.shutdownWg.Done()
m.receiveACKs(ctx, flowClient)
Expand All @@ -116,6 +122,10 @@ func (m *Manager) resetClient() error {
defer m.shutdownWg.Done()
m.startSender(ctx)
}()
go func() {
defer m.shutdownWg.Done()
m.startRetries(ctx)
}()

return nil
}
Expand Down Expand Up @@ -207,13 +217,15 @@ func (m *Manager) startSender(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
events := m.logger.GetEvents()
collectedEvents := m.logger.ResetAggregationWindow()
events := collectedEvents.GetAggregatedEvents()
for _, event := range events {
if err := m.send(event); err != nil {
log.Errorf("failed to send flow event to server: %v", err)
continue
} else {
log.Tracef("sent flow event: %s", event.ID)
}
log.Tracef("sent flow event: %s", event.ID)
m.eventsWithoutAcks.StoreEvent(event)
}
}
}
Expand All @@ -227,7 +239,7 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
return nil
}
log.Tracef("received flow event ack: %s", id)
m.logger.DeleteEvents([]uuid.UUID{id})
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
return nil
})

Expand All @@ -236,6 +248,43 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
}
}

// We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded.
// We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached.
func (m *Manager) startRetries(ctx context.Context) {
Comment on lines +251 to +253

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Bound the un-acked event store before retrying indefinitely.

When ACKs stop, every sent aggregate remains in eventsWithoutAcks until an ACK arrives. The inline comment already notes the store is unbounded; add a max size/age policy before this can grow client memory without limit during receiver outages.

I can help draft a bounded store policy or open a follow-up issue if you want.

Also applies to: 271-283

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/manager.go` around lines 251 - 253, The
eventsWithoutAcks store in the Manager.startRetries method is unbounded and
grows indefinitely when ACKs stop arriving, consuming unbounded client memory
during receiver outages. Add a maximum size limit and age-based eviction policy
to the eventsWithoutAcks store to prevent unlimited growth, implementing logic
to drop the oldest events when the threshold is reached. Apply this bounded
store policy consistently across all event storage and retry logic throughout
the Manager implementation.

timer := time.NewTimer(m.retryInterval)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use m.retryInterval after the first retry tick.

Line 254 honors the configurable retry interval only for the initial timer; Line 283 hard-codes all later normal passes to one second. That makes short-interval tests or future overrides ineffective after the first fire.

Proposed fix
-			timer = time.NewTimer(time.Second)
+			timer = time.NewTimer(m.retryInterval)

Also applies to: 283-283

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@client/internal/netflow/manager.go` at line 254, The timer initialization at
line 254 respects the configurable m.retryInterval for the first attempt, but
line 283 hard-codes a one-second duration when resetting the timer in the retry
loop. Replace the hard-coded one-second interval at line 283 with
m.retryInterval to ensure the configurable retry interval is applied
consistently across all retry attempts, not just the initial one.

retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: m.flowConfig.Interval / 2,
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
for _, e := range m.eventsWithoutAcks.GetEvents() {
if e.Timestamp.Add(time.Second).After(time.Now()) {
// grace period on retries to avoid early retries
// do not retry if the event is less than 1 sec old
continue
}
if err := m.send(e); err != nil {
timer = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
break
}
}
retryBackoff.Reset()
timer = time.NewTimer(time.Second)
}
}
}

func (m *Manager) send(event *nftypes.Event) error {
m.mux.Lock()
client := m.receiverClient
Expand All @@ -250,9 +299,11 @@ func (m *Manager) send(event *nftypes.Event) error {

func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
protoEvent := &proto.FlowEvent{
EventId: event.ID[:],
Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey,
EventId: event.ID[:],
Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey,
WindowStart: timestamppb.New(event.WindowStart),
WindowEnd: timestamppb.New(event.WindowEnd),
FlowFields: &proto.FlowFields{
FlowId: event.FlowID[:],
RuleId: event.RuleID,
Expand All @@ -267,6 +318,9 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
TxBytes: event.TxBytes,
SourceResourceId: event.SourceResourceID,
DestResourceId: event.DestResourceID,
NumOfStarts: event.NumOfStarts,
NumOfEnds: event.NumOfEnds,
NumOfDrops: event.NumOfDrops,
},
}

Expand Down
Loading
Loading