Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
60bcf7d
added an implementation of aggregating memory store
dmitri-netbird Jun 9, 2026
243e934
initial support for aggregation of events
dmitri-netbird Jun 9, 2026
b654a75
added tcp-aggregation test
dmitri-netbird Jun 10, 2026
101ae3c
added manager integration test
dmitri-netbird Jun 10, 2026
8f99362
added tracking of the number of start-, drop, and end-events in an ag…
dmitri-netbird Jun 10, 2026
42e0007
fixes based on sonarcube checks
dmitri-netbird Jun 11, 2026
12a8943
regenerated proto files
dmitri-netbird Jun 11, 2026
a593e32
removed inadvertenly added google proto files
dmitri-netbird Jun 11, 2026
d9d585e
pacifying linter
dmitri-netbird Jun 11, 2026
598558c
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 11, 2026
98ce097
update test to validate event aggregation over tcp, udp, icmp, and ic…
dmitri-netbird Jun 11, 2026
b21f7f7
updated event aggregation test
dmitri-netbird Jun 12, 2026
e3f9396
regenerate protobufs with expected versions of protoc and protoc-gen-go
dmitri-netbird Jun 15, 2026
c875aa6
remove protoc/protoc-gen headers from flow_grpc.pb.go
dmitri-netbird Jun 15, 2026
fae5b7e
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 15, 2026
9dc5e77
updated openapi spec
dmitri-netbird Jun 15, 2026
07c527f
updated openapi NetworkTrafficEvent spec, regenerated types
dmitri-netbird Jun 15, 2026
a93cb66
respond to feedback
dmitri-netbird Jun 16, 2026
7295e2e
fixed an issue with how we track events that shouldn't be aggregated
dmitri-netbird Jun 16, 2026
67d1419
fixed mapping of events to protobuf
dmitri-netbird Jun 16, 2026
ca4ce0a
icmp code values in aggregated events do not matter
dmitri-netbird Jun 16, 2026
e89c0f5
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 16, 2026
0286c17
regenerate openapi types
dmitri-netbird Jun 16, 2026
3d6fc3b
added a comment re: unbounded unacked events
dmitri-netbird Jun 16, 2026
9ea463e
reset aggregated event type to unknown
dmitri-netbird Jun 16, 2026
1721a4f
fix event aggregation test
dmitri-netbird Jun 16, 2026
5dc159e
used the source port of the earliest event
dmitri-netbird Jun 16, 2026
0e95b6d
add tracking of window starts and ends
dmitri-netbird Jun 15, 2026
fd763ec
updated openapi spec
dmitri-netbird Jun 15, 2026
17cc13f
reverted changes to generate.sh
dmitri-netbird Jun 17, 2026
4a13418
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 22, 2026
41a15f6
cleanup handling of not-aggregated events + test
dmitri-netbird Jun 22, 2026
1f1413e
responded to feedback + small fixes
dmitri-netbird Jun 22, 2026
928bfe3
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 22, 2026
05308fb
small fix in a test
dmitri-netbird Jun 22, 2026
a5d455e
Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation
dmitri-netbird Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions client/internal/netflow/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ type Logger struct {
wgIfaceNetV6 netip.Prefix
dnsCollection atomic.Bool
exitNodeCollection atomic.Bool
Store types.Store
Store types.AggregatingStore
}

func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
return &Logger{
statusRecorder: statusRecorder,
wgIfaceNet: wgIfaceIPNet,
wgIfaceNetV6: wgIfaceIPNetV6,
Store: store.NewMemoryStore(),
Store: store.NewAggregatingMemoryStore(),
}
}

Expand Down Expand Up @@ -125,6 +125,10 @@ func (l *Logger) stop() {
l.mux.Unlock()
}

func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
return l.Store.ResetAggregationWindow()
}

func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents()
}
Expand Down
107 changes: 82 additions & 25 deletions client/internal/netflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/store"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/flow/client"
Expand All @@ -23,14 +25,16 @@ import (

// Manager handles netflow tracking and logging
type Manager struct {
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
publicKey []byte
cancel context.CancelFunc
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
eventsWithoutAcks nftypes.Store
publicKey []byte
cancel context.CancelFunc
retryInterval time.Duration
}

// NewManager creates a new netflow manager
Expand All @@ -48,9 +52,11 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
}

return &Manager{
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
retryInterval: time.Second,
eventsWithoutAcks: store.NewMemoryStore(),
}
}

Expand All @@ -66,6 +72,7 @@ func (m *Manager) needsNewClient(previous *nftypes.FlowConfig) bool {
}

// enableFlow starts components for flow tracking
// must be called under m.mux lock
func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
// first make sender ready so events don't pile up
if m.needsNewClient(previous) {
Expand All @@ -85,6 +92,7 @@ func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
return nil
}

// must be called under m.mux lock
func (m *Manager) resetClient() error {
if m.receiverClient != nil {
if err := m.receiverClient.Close(); err != nil {
Expand All @@ -107,14 +115,19 @@ func (m *Manager) resetClient() error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.shutdownWg.Add(2)
m.shutdownWg.Add(3)
flowConfigInterval := m.flowConfig.Interval
go func() {
defer m.shutdownWg.Done()
m.receiveACKs(ctx, flowClient)
m.receiveACKs(ctx, flowClient, flowConfigInterval)
}()
go func() {
defer m.shutdownWg.Done()
m.startSender(ctx)
m.startSender(ctx, flowConfigInterval)
}()
go func() {
defer m.shutdownWg.Done()
m.startRetries(ctx, flowConfigInterval)
}()

return nil
Expand Down Expand Up @@ -198,36 +211,38 @@ func (m *Manager) GetLogger() nftypes.FlowLogger {
return m.logger
}

func (m *Manager) startSender(ctx context.Context) {
ticker := time.NewTicker(m.flowConfig.Interval)
func (m *Manager) startSender(ctx context.Context, flowConfigInterval time.Duration) {
ticker := time.NewTicker(flowConfigInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
events := m.logger.GetEvents()
collectedEvents := m.logger.ResetAggregationWindow()
events := collectedEvents.GetAggregatedEvents()
for _, event := range events {
if err := m.send(event); err != nil {
log.Errorf("failed to send flow event to server: %v", err)
continue
} else {
log.Tracef("sent flow event: %s", event.ID)
}
log.Tracef("sent flow event: %s", event.ID)
m.eventsWithoutAcks.StoreEvent(event)
}
}
}
}

func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
err := client.Receive(ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error {
func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient, flowConfigInterval time.Duration) {
err := client.Receive(ctx, flowConfigInterval, func(ack *proto.FlowEventAck) error {
id, err := uuid.FromBytes(ack.EventId)
if err != nil {
log.Warnf("failed to convert ack event id to uuid: %v", err)
return nil
}
log.Tracef("received flow event ack: %s", id)
m.logger.DeleteEvents([]uuid.UUID{id})
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
return nil
})

Expand All @@ -236,6 +251,43 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
}
}

// We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded.
// We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached.
func (m *Manager) startRetries(ctx context.Context, flowConfigInterval time.Duration) {
timer := time.NewTimer(m.retryInterval)
Comment thread
dmitri-netbird marked this conversation as resolved.
retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: flowConfigInterval / 2,
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
for _, e := range m.eventsWithoutAcks.GetEvents() {
if e.Timestamp.Add(time.Second).After(time.Now()) {
// grace period on retries to avoid early retries
// do not retry if the event is less than 1 sec old
continue
}
if err := m.send(e); err != nil {
timer = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
break
}
}
retryBackoff.Reset()
timer = time.NewTimer(m.retryInterval)
}
}
}

func (m *Manager) send(event *nftypes.Event) error {
m.mux.Lock()
client := m.receiverClient
Expand All @@ -250,9 +302,11 @@ func (m *Manager) send(event *nftypes.Event) error {

func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
protoEvent := &proto.FlowEvent{
EventId: event.ID[:],
Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey,
EventId: event.ID[:],
Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey,
WindowStart: timestamppb.New(event.WindowStart),
WindowEnd: timestamppb.New(event.WindowEnd),
FlowFields: &proto.FlowFields{
FlowId: event.FlowID[:],
RuleId: event.RuleID,
Expand All @@ -267,6 +321,9 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
TxBytes: event.TxBytes,
SourceResourceId: event.SourceResourceID,
DestResourceId: event.DestResourceID,
NumOfStarts: event.NumOfStarts,
NumOfEnds: event.NumOfEnds,
NumOfDrops: event.NumOfDrops,
},
}

Expand Down
Loading
Loading