[client] introduce client-side event aggregation#6394
Conversation
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
…gregation window Signed-off-by: Dmitri <dmitri.external@netbird.io>
📝 WalkthroughWalkthroughAdds ACK-based retry logic to the netflow ChangesNetflow ACK-based Event Retrying and Aggregation
Sequence Diagram(s)sequenceDiagram
rect rgba(173, 216, 230, 0.5)
note over Manager,AggregatingMemory: Send Window
end
participant Manager
participant AggregatingMemory
participant eventsWithoutAcks
participant FlowServer
Manager->>AggregatingMemory: ResetAggregationWindow()
AggregatingMemory-->>Manager: previous window aggregated []*Event
loop each aggregated event
Manager->>FlowServer: send FlowEvent (window_start, window_end, num_of_starts, ...)
Manager->>eventsWithoutAcks: StoreEvent (always)
end
FlowServer-->>Manager: FlowEventAck(event_ids)
Manager->>eventsWithoutAcks: DeleteEvents(event_ids)
rect rgba(255, 223, 186, 0.5)
note over Manager,FlowServer: Retry Loop (1s ticker)
end
loop startRetries every 1s
Manager->>eventsWithoutAcks: GetEvents()
eventsWithoutAcks-->>Manager: un-acked events
alt event age > 1s grace
Manager->>FlowServer: retry send FlowEvent
alt send fails
Manager->>Manager: exponential backoff (NextBackOff)
else send succeeds
Manager->>Manager: reset backoff, re-arm 1s timer
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6394 +/- ##
=======================================
Coverage ? 30.40%
=======================================
Files ? 912
Lines ? 113833
Branches ? 0
=======================================
Hits ? 34612
Misses ? 74640
Partials ? 4581
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
client/internal/netflow/manager.go (1)
298-318:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winAggregated event counters are not populated in the protobuf message.
The aggregated
NumOfStarts,NumOfEnds, andNumOfDropsfields computed byGetAggregatedEvents()(memory.go lines 96-122) are not set in theFlowFieldsprotobuf message. The server will receive aggregated events with zeroed counters, losing the aggregation data.🐛 Proposed fix
protoEvent := &proto.FlowEvent{ EventId: event.ID[:], Timestamp: timestamppb.New(event.Timestamp), PublicKey: publicKey, FlowFields: &proto.FlowFields{ FlowId: event.FlowID[:], RuleId: event.RuleID, Type: proto.Type(event.Type), Direction: proto.Direction(event.Direction), Protocol: uint32(event.Protocol), SourceIp: event.SourceIP.AsSlice(), DestIp: event.DestIP.AsSlice(), RxPackets: event.RxPackets, TxPackets: event.TxPackets, RxBytes: event.RxBytes, TxBytes: event.TxBytes, SourceResourceId: event.SourceResourceID, DestResourceId: event.DestResourceID, + NumOfStarts: event.NumOfStarts, + NumOfEnds: event.NumOfEnds, + NumOfDrops: event.NumOfDrops, }, }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/manager.go` around lines 298 - 318, The FlowFields protobuf is missing the aggregated counters, so populate FlowFields.NumOfStarts, FlowFields.NumOfEnds, and FlowFields.NumOfDrops from the nftypes.Event before returning the proto.FlowEvent: in toProtoEvent(publicKey []byte, event *nftypes.Event) add the three fields (NumOfStarts: event.NumOfStarts, NumOfEnds: event.NumOfEnds, NumOfDrops: event.NumOfDrops) inside the proto.FlowFields literal so aggregated counts computed by GetAggregatedEvents() are sent to the server.
🧹 Nitpick comments (3)
client/internal/netflow/store/memory.go (2)
73-79: ⚡ Quick winRename
icmpTypefield toicmpCodefor clarity.The
aggregationKey.icmpTypefield is populated withv.ICMPCode(line 84), notv.ICMPType. Renaming the field toicmpCodewill align the struct definition with its usage and improve code clarity.♻️ Proposed fix
type aggregationKey struct { destAddr netip.Addr destPort uint16 protocol uint8 - icmpType uint8 + icmpCode uint8 unique int64 // used to prevent aggregation on non icmp/udp/tcp events }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/store/memory.go` around lines 73 - 79, The struct field aggregationKey.icmpType is misnamed because callers populate it with v.ICMPCode; rename the field to icmpCode to match usage and improve clarity. Update the aggregationKey definition (replace icmpType with icmpCode) and any references to aggregationKey.icmpType throughout the codebase (e.g., places that assign v.ICMPCode) so they compile and keep semantics unchanged.
123-127: 💤 Low valueConsider documenting the timestamp selection logic.
The condition
aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0keeps the earliest event's timestamp, ID, and Type when aggregating multiple events. This is intentional (confirmed by tests), but the logic could benefit from a comment explaining that the earliest event's metadata is preserved to represent the aggregation window's start.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/store/memory.go` around lines 123 - 127, Add a short explanatory comment above the timestamp selection block explaining that the condition aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 intentionally preserves the earliest event's timestamp, ID and Type when merging events so the aggregated record represents the start of the aggregation window; reference the variables aggregatedEvent, v, Timestamp.Compare, ID and Type in the comment so future readers understand the preserved metadata semantic.client/internal/netflow/store/tcp_aggregation_test.go (1)
21-279: ⚡ Quick winAdd ICMP test coverage for aggregation logic.
The test suite only covers TCP protocol aggregation. Since the PR objectives mention ICMP aggregation by code, and
GetAggregatedEventshas ICMP-specific logic (line 84 in memory.go usesICMPCodeas part of the aggregation key), adding at least one ICMP test case would verify that ICMP flows are aggregated correctly.Consider adding a test case that verifies ICMP aggregation:
{ description: "ICMP echo requests", events: []*types.Event{ { EventFields: types.EventFields{ Type: types.TypeStart, Protocol: types.ICMP, DestIP: ipAddr("2.2.2.2"), ICMPType: 8, // Echo Request ICMPCode: 0, RxBytes: 100, }}, { EventFields: types.EventFields{ Type: types.TypeEnd, Protocol: types.ICMP, DestIP: ipAddr("2.2.2.2"), ICMPType: 8, ICMPCode: 0, RxBytes: 200, }}, }, expected: []*types.Event{ { EventFields: types.EventFields{ Type: types.TypeStart, Protocol: types.ICMP, DestIP: ipAddr("2.2.2.2"), ICMPType: 8, ICMPCode: 0, RxBytes: 300, NumOfStarts: 1, NumOfEnds: 1, }}, }, },🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/store/tcp_aggregation_test.go` around lines 21 - 279, Add an ICMP test case to TestTcpAggregation to cover the ICMP-specific aggregation key (ICMPCode) used by GetAggregatedEvents: create events with Protocol types.ICMP and matching ICMPType/ICMPCode (e.g., echo request ICMPType=8, ICMPCode=0) with start and end events that have RxBytes/TXBytes to be summed, store them via NewAggregatingMemoryStore().StoreEvent, then include an expected aggregated Event that verifies RxBytes/TXBytes are summed and NumOfStarts/NumOfEnds reflect the inputs; reference TestTcpAggregation, GetAggregatedEvents, NewAggregatingMemoryStore, and the ICMPCode/ICMPType fields when adding the case.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@client/internal/netflow/store/memory.go`:
- Around line 81-132: In GetAggregatedEvents, the aggregation lookup key is
built with the ICMP code (aggregationKey.icmpType = v.ICMPCode) which groups by
the wrong field; change the lookupKey construction in GetAggregatedEvents to use
the ICMP Type field from the event (e.g., v.ICMPType) so aggregation groups by
type not code, and ensure aggregationKey.icmpType's expected type matches the
event field; update any related comments if present.
In `@flow/proto/generate.sh`:
- Around line 15-16: Uncomment and restore the pinned go install lines in
generate.sh for the protobuf generators so regeneration is reproducible:
re-enable the go install invocations for protoc-gen-go and protoc-gen-go-grpc
(the previously commented lines) and update their version tags to match the
repository's checked-in generators (protoc-gen-go v1.36.11 and
protoc-gen-go-grpc v1.6.1); leave any protoc binary handling unchanged but
ensure the script documents the required protoc version (protoc v7.34.1) if it
cannot be installed via go.
---
Outside diff comments:
In `@client/internal/netflow/manager.go`:
- Around line 298-318: The FlowFields protobuf is missing the aggregated
counters, so populate FlowFields.NumOfStarts, FlowFields.NumOfEnds, and
FlowFields.NumOfDrops from the nftypes.Event before returning the
proto.FlowEvent: in toProtoEvent(publicKey []byte, event *nftypes.Event) add the
three fields (NumOfStarts: event.NumOfStarts, NumOfEnds: event.NumOfEnds,
NumOfDrops: event.NumOfDrops) inside the proto.FlowFields literal so aggregated
counts computed by GetAggregatedEvents() are sent to the server.
---
Nitpick comments:
In `@client/internal/netflow/store/memory.go`:
- Around line 73-79: The struct field aggregationKey.icmpType is misnamed
because callers populate it with v.ICMPCode; rename the field to icmpCode to
match usage and improve clarity. Update the aggregationKey definition (replace
icmpType with icmpCode) and any references to aggregationKey.icmpType throughout
the codebase (e.g., places that assign v.ICMPCode) so they compile and keep
semantics unchanged.
- Around line 123-127: Add a short explanatory comment above the timestamp
selection block explaining that the condition
aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 intentionally preserves the
earliest event's timestamp, ID and Type when merging events so the aggregated
record represents the start of the aggregation window; reference the variables
aggregatedEvent, v, Timestamp.Compare, ID and Type in the comment so future
readers understand the preserved metadata semantic.
In `@client/internal/netflow/store/tcp_aggregation_test.go`:
- Around line 21-279: Add an ICMP test case to TestTcpAggregation to cover the
ICMP-specific aggregation key (ICMPCode) used by GetAggregatedEvents: create
events with Protocol types.ICMP and matching ICMPType/ICMPCode (e.g., echo
request ICMPType=8, ICMPCode=0) with start and end events that have
RxBytes/TXBytes to be summed, store them via
NewAggregatingMemoryStore().StoreEvent, then include an expected aggregated
Event that verifies RxBytes/TXBytes are summed and NumOfStarts/NumOfEnds reflect
the inputs; reference TestTcpAggregation, GetAggregatedEvents,
NewAggregatingMemoryStore, and the ICMPCode/ICMPType fields when adding the
case.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4e155db1-9e65-4af4-8192-353348a2ac12
⛔ Files ignored due to path filters (2)
flow/proto/flow.pb.gois excluded by!**/*.pb.goflow/proto/flow_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (8)
client/internal/netflow/logger/logger.goclient/internal/netflow/manager.goclient/internal/netflow/manager_integration_test.goclient/internal/netflow/store/memory.goclient/internal/netflow/store/tcp_aggregation_test.goclient/internal/netflow/types/types.goflow/proto/flow.protoflow/proto/generate.sh
…mpv6 Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
Signed-off-by: Dmitri <dmitri.external@netbird.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@client/internal/netflow/store/event_aggregation_test.go`:
- Around line 45-47: The generateEvents function is being called inside a loop
with randomized DestPort values that can collide, causing test nondeterminism
when multiple event groups accidentally get the same port and merge. Modify the
generateEvents calls to use deterministic port values derived from the loop
variable i (such as basePort + i or similar) instead of relying on
randomization. This applies at both the loop starting at line 45-47 and the
similar aggregation test loop at lines 110-118 to ensure each generated event
group has a unique, collision-free port identifier.
In `@shared/management/http/api/openapi.yml`:
- Around line 2768-2779: The OpenAPI schema exposes three new counter fields
(num_of_starts, num_of_ends, num_of_drops) but the toProtoEvent function does
not map these counters to the corresponding proto fields
(FlowFields.NumOfStarts, FlowFields.NumOfEnds, FlowFields.NumOfDrops). Update
the toProtoEvent function to set these three FlowFields proto properties from
the aggregated counter values so the new API fields are properly populated
end-to-end.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: baa642fa-3003-452a-a2ad-2b4283e82c15
⛔ Files ignored due to path filters (2)
flow/proto/flow.pb.gois excluded by!**/*.pb.goflow/proto/flow_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (4)
client/internal/netflow/store/event_aggregation_test.goclient/internal/netflow/store/memory.goshared/management/http/api/openapi.ymlshared/management/http/api/types.gen.go
✅ Files skipped from review due to trivial changes (1)
- shared/management/http/api/types.gen.go
There was a problem hiding this comment.
Compared to what was there from before the aggregation key does not consider direction and source IP and port (credits to @lixmal ).
I think the source port as being ephemeral is ok to drop, but dir and source IP are probably to keep.
I think there might be one more little problem in general: all the fields that are part of the Event, originate as information from the traffic (e.g. NOT ID / Timestamp), but are not part of the Aggregation Key should ideally be removed from the sent event - which I suppose is not possible for backward compat-. If not possible they should at least be cleared before sending the event. Otherwise you are populating them with the first collected event of the time window which is not statistically meaningful.
Nods, fixed this.
Nods, can't drop the fields, as it would be a breaking change. The fields that we are not explicitly aggregating on:
Got clarification re: SourceResourceID -- for a given src ip, dst ip, dst port, and direction, resouce id should stay the same. |
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
450a879 to
1721a4f
Compare
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
|
proto version check is failing b/c flow grpc bindings ( |
Signed-off-by: Dmitri Dolguikh <dmitri.external@netbird.io>
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
client/internal/netflow/manager.go (1)
54-60:⚠️ Potential issue | 🟠 Major | ⚡ Quick winClear un-acked events when flow is disabled or the receiver changes.
eventsWithoutAckssurvivesdisableFlowand receiver resets. After disabling flow or changing URL/token, stale flow metadata can be retried to the next receiver; clear or reinitialize this store at those lifecycle boundaries.Proposed fix
if m.receiverClient != nil { if err := m.receiverClient.Close(); err != nil { log.Warnf("error closing previous flow client: %v", err) } + m.eventsWithoutAcks.Close() } @@ m.logger.Close() + m.eventsWithoutAcks.Close() if m.receiverClient == nil { return nil }Also applies to: 94-100, 143-150
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/manager.go` around lines 54 - 60, The eventsWithoutAcks store is not being cleared when the flow is disabled or the receiver changes, which can cause stale event metadata to be retried to a new receiver. Add logic to clear or reinitialize the eventsWithoutAcks store in the disableFlow method (referenced at lines 94-100) and in the receiver change/reset logic (referenced at lines 143-150) to ensure that pending events are discarded when these lifecycle boundaries are crossed.
🧹 Nitpick comments (1)
client/internal/netflow/store/event_aggregation_test.go (1)
57-58: ⚡ Quick winAdd boundary coverage for source ports and ICMP codes.
The tests vary source IP and ICMP type, but never create same-source/same-destination TCP or UDP events with different source ports, and never set
ICMPCode. Add those cases so aggregation-key regressions cannot collapse fields that are still serialized downstream.Also applies to: 109-110, 129-165
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@client/internal/netflow/store/event_aggregation_test.go` around lines 57 - 58, Expand the test cases in the event aggregation test to include boundary coverage for source ports and ICMP codes. Specifically, add test scenarios where same-source and same-destination TCP or UDP events have different source ports (by modifying the generateEvents calls) and add test cases where ICMPCode is set on ICMP events. This ensures that aggregation keys properly distinguish events based on these fields and prevents regressions where fields still used downstream could be incorrectly collapsed during aggregation. Apply these additions across all relevant test cases including the ones around lines 57-58, 109-110, and 129-165.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@client/internal/netflow/manager.go`:
- Line 254: The timer initialization at line 254 respects the configurable
m.retryInterval for the first attempt, but line 283 hard-codes a one-second
duration when resetting the timer in the retry loop. Replace the hard-coded
one-second interval at line 283 with m.retryInterval to ensure the configurable
retry interval is applied consistently across all retry attempts, not just the
initial one.
- Around line 251-253: The eventsWithoutAcks store in the Manager.startRetries
method is unbounded and grows indefinitely when ACKs stop arriving, consuming
unbounded client memory during receiver outages. Add a maximum size limit and
age-based eviction policy to the eventsWithoutAcks store to prevent unlimited
growth, implementing logic to drop the oldest events when the threshold is
reached. Apply this bounded store policy consistently across all event storage
and retry logic throughout the Manager implementation.
In `@client/internal/netflow/store/memory.go`:
- Around line 77-84: The aggregationKey struct is missing the sourcePort field,
which causes TCP/UDP events from the same source IP but different source ports
to be incorrectly aggregated together. Add a sourcePort field of type uint16 to
the aggregationKey struct (similar to the existing destPort field), then ensure
this field is properly populated when creating aggregation keys in the grouping
logic around line 93 and when building keys around lines 140-144, so that events
are only aggregated when they match on source IP, source port, destination
IP/port, direction, and protocol.
- Line 84: The unsupported-protocol handling branch returns before assigning
WindowStart, WindowEnd, and single-event counter values, causing toProtoEvent to
serialize zero values for these metadata fields. Additionally, the unique field
assignment uses time.Now().UnixNano() for uniqueness instead of the event ID.
Ensure that all unsupported/pass-through protocol events have proper
WindowStart, WindowEnd, and counter assignments before returning, and replace
the time.Now().UnixNano() call with the actual event ID when setting the unique
field to properly track event identity regardless of protocol type.
- Around line 69-72: Capture the current time once before creating the
AggregatingMemory object in the window rotation logic. Instead of calling
time.Now() twice (once for WindowEnd when creating the toret AggregatingMemory
and once for WindowStart when resetting am.WindowStart), assign time.Now() to a
variable like now, then use this same now variable for both the WindowEnd field
in the AggregatingMemory initialization and the am.WindowStart assignment. This
ensures consistent timestamp boundaries and eliminates any time gap between the
closing and opening of aggregation windows.
- Around line 112-118: The ICMPCode field is not included in the aggregation key
for ICMP events, but it is still serialized by toProtoEvent, which leaves an
arbitrary code value in the aggregated output. In the code section where
event.Type is set to types.TypeUnknown (around line 112), explicitly reset the
ICMPCode field to its zero value (0 or nil depending on the field type) to
ensure consistency with the type-level aggregation. This should be done
alongside the existing assignments to event.WindowStart and event.WindowEnd to
maintain data integrity.
---
Outside diff comments:
In `@client/internal/netflow/manager.go`:
- Around line 54-60: The eventsWithoutAcks store is not being cleared when the
flow is disabled or the receiver changes, which can cause stale event metadata
to be retried to a new receiver. Add logic to clear or reinitialize the
eventsWithoutAcks store in the disableFlow method (referenced at lines 94-100)
and in the receiver change/reset logic (referenced at lines 143-150) to ensure
that pending events are discarded when these lifecycle boundaries are crossed.
---
Nitpick comments:
In `@client/internal/netflow/store/event_aggregation_test.go`:
- Around line 57-58: Expand the test cases in the event aggregation test to
include boundary coverage for source ports and ICMP codes. Specifically, add
test scenarios where same-source and same-destination TCP or UDP events have
different source ports (by modifying the generateEvents calls) and add test
cases where ICMPCode is set on ICMP events. This ensures that aggregation keys
properly distinguish events based on these fields and prevents regressions where
fields still used downstream could be incorrectly collapsed during aggregation.
Apply these additions across all relevant test cases including the ones around
lines 57-58, 109-110, and 129-165.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 22857878-55b9-4398-8983-8a27c9b79e09
⛔ Files ignored due to path filters (2)
flow/proto/flow.pb.gois excluded by!**/*.pb.goflow/proto/flow_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (8)
client/internal/netflow/manager.goclient/internal/netflow/store/event_aggregation_test.goclient/internal/netflow/store/memory.goclient/internal/netflow/types/types.goflow/proto/flow.protoflow/proto/generate.shshared/management/http/api/openapi.ymlshared/management/http/api/types.gen.go
✅ Files skipped from review due to trivial changes (1)
- flow/proto/generate.sh
🚧 Files skipped from review as they are similar to previous changes (1)
- client/internal/netflow/types/types.go
| // We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded. | ||
| // We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached. | ||
| func (m *Manager) startRetries(ctx context.Context) { |
There was a problem hiding this comment.
Bound the un-acked event store before retrying indefinitely.
When ACKs stop, every sent aggregate remains in eventsWithoutAcks until an ACK arrives. The inline comment already notes the store is unbounded; add a max size/age policy before this can grow client memory without limit during receiver outages.
I can help draft a bounded store policy or open a follow-up issue if you want.
Also applies to: 271-283
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/manager.go` around lines 251 - 253, The
eventsWithoutAcks store in the Manager.startRetries method is unbounded and
grows indefinitely when ACKs stop arriving, consuming unbounded client memory
during receiver outages. Add a maximum size limit and age-based eviction policy
to the eventsWithoutAcks store to prevent unlimited growth, implementing logic
to drop the oldest events when the threshold is reached. Apply this bounded
store policy consistently across all event storage and retry logic throughout
the Manager implementation.
| // We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded. | ||
| // We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached. | ||
| func (m *Manager) startRetries(ctx context.Context) { | ||
| timer := time.NewTimer(m.retryInterval) |
There was a problem hiding this comment.
Use m.retryInterval after the first retry tick.
Line 254 honors the configurable retry interval only for the initial timer; Line 283 hard-codes all later normal passes to one second. That makes short-interval tests or future overrides ineffective after the first fire.
Proposed fix
- timer = time.NewTimer(time.Second)
+ timer = time.NewTimer(m.retryInterval)Also applies to: 283-283
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/manager.go` at line 254, The timer initialization at
line 254 respects the configurable m.retryInterval for the first attempt, but
line 283 hard-codes a one-second duration when resetting the timer in the retry
loop. Replace the hard-coded one-second interval at line 283 with
m.retryInterval to ensure the configurable retry interval is applied
consistently across all retry attempts, not just the initial one.
| toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: time.Now(), Memory: Memory{events: am.events}} | ||
|
|
||
| am.events = make(map[uuid.UUID]*types.Event) | ||
| am.WindowStart = time.Now() |
There was a problem hiding this comment.
Use one timestamp when rotating the aggregation window.
Line 69 closes the previous window and Line 72 opens the next one with separate time.Now() calls, leaving a small gap in reported window coverage. Capture now once and use it for both boundaries.
Proposed fix
+ now := time.Now()
- toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: time.Now(), Memory: Memory{events: am.events}}
+ toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: now, Memory: Memory{events: am.events}}
am.events = make(map[uuid.UUID]*types.Event)
- am.WindowStart = time.Now()
+ am.WindowStart = now📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: time.Now(), Memory: Memory{events: am.events}} | |
| am.events = make(map[uuid.UUID]*types.Event) | |
| am.WindowStart = time.Now() | |
| now := time.Now() | |
| toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: now, Memory: Memory{events: am.events}} | |
| am.events = make(map[uuid.UUID]*types.Event) | |
| am.WindowStart = now |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/store/memory.go` around lines 69 - 72, Capture the
current time once before creating the AggregatingMemory object in the window
rotation logic. Instead of calling time.Now() twice (once for WindowEnd when
creating the toret AggregatingMemory and once for WindowStart when resetting
am.WindowStart), assign time.Now() to a variable like now, then use this same
now variable for both the WindowEnd field in the AggregatingMemory
initialization and the am.WindowStart assignment. This ensures consistent
timestamp boundaries and eliminates any time gap between the closing and opening
of aggregation windows.
| type aggregationKey struct { | ||
| srcAddr netip.Addr | ||
| destAddr netip.Addr | ||
| destPort uint16 | ||
| direction int | ||
| protocol uint8 | ||
| icmpType uint8 | ||
| unique int64 // used to prevent aggregation on non icmp/udp/tcp events |
There was a problem hiding this comment.
Include SourcePort in the aggregation key.
Line 93 groups TCP/UDP events from the same source IP/destination/direction regardless of source port, but SourcePort is still emitted downstream. That can attribute combined counters to whichever port survives Line 143. This also conflicts with the PR discussion that source IP and source port were fixed as part of the key.
Proposed fix
type aggregationKey struct {
srcAddr netip.Addr
+ srcPort uint16
destAddr netip.Addr
destPort uint16
direction int
protocol uint8
icmpType uint8
unique int64 // used to prevent aggregation on non icmp/udp/tcp events
}
@@
- lookupKey := aggregationKey{srcAddr: v.SourceIP, destAddr: v.DestIP, destPort: v.DestPort, direction: int(v.Direction), protocol: uint8(v.Protocol), icmpType: v.ICMPType}
+ lookupKey := aggregationKey{srcAddr: v.SourceIP, srcPort: v.SourcePort, destAddr: v.DestIP, destPort: v.DestPort, direction: int(v.Direction), protocol: uint8(v.Protocol), icmpType: v.ICMPType}Also applies to: 93-93, 140-144
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/store/memory.go` around lines 77 - 84, The
aggregationKey struct is missing the sourcePort field, which causes TCP/UDP
events from the same source IP but different source ports to be incorrectly
aggregated together. Add a sourcePort field of type uint16 to the aggregationKey
struct (similar to the existing destPort field), then ensure this field is
properly populated when creating aggregation keys in the grouping logic around
line 93 and when building keys around lines 140-144, so that events are only
aggregated when they match on source IP, source port, destination IP/port,
direction, and protocol.
| direction int | ||
| protocol uint8 | ||
| icmpType uint8 | ||
| unique int64 // used to prevent aggregation on non icmp/udp/tcp events |
There was a problem hiding this comment.
Preserve metadata for pass-through protocols.
The unsupported-protocol branch returns before assigning WindowStart, WindowEnd, or single-event counters, so toProtoEvent serializes zero windows/counts for those events. It also relies on time.Now().UnixNano() for uniqueness; use the event ID instead.
Proposed fix
- unique int64 // used to prevent aggregation on non icmp/udp/tcp events
+ unique uuid.UUID // used to prevent aggregation on non icmp/udp/tcp events
@@
if event.Protocol != types.ICMP && event.Protocol != types.ICMPv6 && event.Protocol != types.UDP && event.Protocol != types.TCP {
- lookupKey.unique = time.Now().UnixNano() // to make the lookup key unique so we don't aggregate on it
+ lookupKey.unique = event.ID // make the lookup key unique so we don't aggregate it
+ switch event.Type {
+ case types.TypeStart:
+ event.NumOfStarts += 1
+ case types.TypeDrop:
+ event.NumOfDrops += 1
+ case types.TypeEnd:
+ event.NumOfEnds += 1
+ }
+ event.WindowStart = am.WindowStart
+ event.WindowEnd = am.WindowEnd
aggregated[lookupKey] = event
continue
}Also applies to: 97-100
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/store/memory.go` at line 84, The unsupported-protocol
handling branch returns before assigning WindowStart, WindowEnd, and
single-event counter values, causing toProtoEvent to serialize zero values for
these metadata fields. Additionally, the unique field assignment uses
time.Now().UnixNano() for uniqueness instead of the event ID. Ensure that all
unsupported/pass-through protocol events have proper WindowStart, WindowEnd, and
counter assignments before returning, and replace the time.Now().UnixNano() call
with the actual event ID when setting the unique field to properly track event
identity regardless of protocol type.
| event.Type = types.TypeUnknown | ||
|
|
||
| // Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct) | ||
| // so the field value in an icmp event in the "aggregated" doesn't matter | ||
|
|
||
| event.WindowStart = am.WindowStart | ||
| event.WindowEnd = am.WindowEnd |
There was a problem hiding this comment.
Reset or key ICMPCode for aggregated ICMP events.
The comment says ICMPCode is not propagated, but toProtoEvent serializes it. Since the key only includes ICMPType, aggregating different codes leaves an arbitrary cloned code in the output; either add code to the key or explicitly clear it for type-level aggregates.
Proposed fix if type-level aggregation is intended
event.Type = types.TypeUnknown
- // Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct)
- // so the field value in an icmp event in the "aggregated" doesn't matter
+ if event.Protocol == types.ICMP || event.Protocol == types.ICMPv6 {
+ event.ICMPCode = 0
+ }
event.WindowStart = am.WindowStart
event.WindowEnd = am.WindowEnd📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| event.Type = types.TypeUnknown | |
| // Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct) | |
| // so the field value in an icmp event in the "aggregated" doesn't matter | |
| event.WindowStart = am.WindowStart | |
| event.WindowEnd = am.WindowEnd | |
| event.Type = types.TypeUnknown | |
| if event.Protocol == types.ICMP || event.Protocol == types.ICMPv6 { | |
| event.ICMPCode = 0 | |
| } | |
| event.WindowStart = am.WindowStart | |
| event.WindowEnd = am.WindowEnd |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@client/internal/netflow/store/memory.go` around lines 112 - 118, The ICMPCode
field is not included in the aggregation key for ICMP events, but it is still
serialized by toProtoEvent, which leaves an arbitrary code value in the
aggregated output. In the code section where event.Type is set to
types.TypeUnknown (around line 112), explicitly reset the ICMPCode field to its
zero value (0 or nil depending on the field type) to ensure consistency with the
type-level aggregation. This should be done alongside the existing assignments
to event.WindowStart and event.WindowEnd to maintain data integrity.



Describe your changes
To reduce the frequency at which flow events are emitted, a client-aggregation of flow events is implemented in this PR.
Events are aggregated over a time window by destination address, destination port, protocol, and icmp code (for icmp). Total number of start-, end-, and drop-events is tracked, as well as the number of transmitted and received bytes and packets.
Issue ticket number and link
Stack
Checklist
Documentation
Select exactly one:
Docs PR URL (required if "docs added" is checked)
Paste the PR link from https://github.com/netbirdio/docs here:
netbirdio/docs#800
Summary by CodeRabbit