Skip to content
Merged
8 changes: 5 additions & 3 deletions client/embed/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ func (c *Client) Start(startCtx context.Context) error {

select {
case <-startCtx.Done():
// Cancel the client context before stopping: Engine.Start blocks on the
// signal stream while holding the engine mutex and only unblocks on
// cancellation. Stopping first would deadlock on that mutex.
// ConnectClient.Stop now cancels its own run context and waits for the
// run loop to tear the engine down, so this cancel() is no longer
// required to break the deadlock and could be removed. It is kept as a
// defensive belt-and-suspenders: cancelling the parent context first
// guarantees the run loop is unblocked even if Stop's contract regresses.
cancel()
if stopErr := client.Stop(); stopErr != nil {
return fmt.Errorf("stop error after context done. Stop error: %w. Context done: %w", stopErr, startCtx.Err())
Expand Down
42 changes: 26 additions & 16 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -54,6 +55,10 @@ var androidRunOverride func(c *ConnectClient, runningChan chan struct{}, logPath

type ConnectClient struct {
ctx context.Context
runCancel context.CancelFunc
runExited chan struct{}
runOnce sync.Once
runStarted atomic.Bool
config *profilemanager.Config
statusRecorder *peer.Status

Expand All @@ -70,8 +75,14 @@ func NewConnectClient(
config *profilemanager.Config,
statusRecorder *peer.Status,
) *ConnectClient {
// Derive the run context here so Stop owns the cancel that unblocks the run
// loop. runCancel is set once at construction, so Stop can call it without
// racing the run loop's startup. Callers therefore need not cancel before Stop.
runCtx, runCancel := context.WithCancel(ctx)
return &ConnectClient{
ctx: ctx,
ctx: runCtx,
runCancel: runCancel,
runExited: make(chan struct{}),
config: config,
statusRecorder: statusRecorder,
engineMutex: sync.Mutex{},
Expand Down Expand Up @@ -132,6 +143,11 @@ func (c *ConnectClient) RunOniOS(
}

func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan struct{}, logPath string) error {
// Mark the loop as started and signal exit on return so Stop can wait for
// the loop to finish (and skip the wait if the loop never ran).
c.runStarted.Store(true)
defer c.runOnce.Do(func() { close(c.runExited) })

defer func() {
if r := recover(); r != nil {
rec := c.statusRecorder
Expand Down Expand Up @@ -287,7 +303,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
log.Debug(err)
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) {
state.Set(StatusNeedsLogin)
_ = c.Stop()
c.runCancel()
return backoff.Permanent(wrapErr(err)) // unrecoverable error
}
return wrapErr(err)
Expand Down Expand Up @@ -407,14 +423,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
c.engine = nil
c.engineMutex.Unlock()

// todo: consider to remove this condition. Is not thread safe.
// We should always call Stop(), but we need to verify that it is idempotent
if engine.wgInterface != nil {
log.Infof("ensuring %s is removed, Netbird engine context cancelled", engine.wgInterface.Name())
log.Infof("ensuring wg interface is removed, Netbird engine context cancelled")

if err := engine.Stop(); err != nil {
log.Errorf("Failed to stop engine: %v", err)
}
if err := engine.Stop(); err != nil {
log.Errorf("Failed to stop engine: %v", err)
}
c.statusRecorder.ClientTeardown()

Expand All @@ -430,12 +442,12 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}

c.statusRecorder.ClientStart()
err = backoff.Retry(operation, backOff)
err = backoff.Retry(operation, backoff.WithContext(backOff, c.ctx))
if err != nil {
log.Debugf("exiting client retry loop due to unrecoverable error: %s", err)
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) {
state.Set(StatusNeedsLogin)
_ = c.Stop()
c.runCancel()
}
return err
}
Expand Down Expand Up @@ -513,11 +525,9 @@ func (c *ConnectClient) Status() StatusType {
}

func (c *ConnectClient) Stop() error {
engine := c.Engine()
if engine != nil {
if err := engine.Stop(); err != nil {
return fmt.Errorf("stop engine: %w", err)
}
c.runCancel()
if c.runStarted.Load() {
<-c.runExited
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
return nil
}
Expand Down
110 changes: 73 additions & 37 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@

var ErrResetConnection = fmt.Errorf("reset connection")

var ErrEngineAlreadyStarted = errors.New("engine already started")

type EngineConfig struct {
WgPort int
WgIfaceName string
Expand Down Expand Up @@ -199,6 +201,8 @@
ctx context.Context
cancel context.CancelFunc

started bool

wgInterface WGIface

udpMux *udpmux.UniversalUDPMuxDefault
Expand Down Expand Up @@ -279,9 +283,15 @@
services EngineServices,
mobileDep MobileDependency,
) *Engine {
// The engine is single-use: a fresh instance is built per connection
// cycle (see Client.run), so the run context is created once here rather
// than in Start.
ctx, cancel := context.WithCancel(clientCtx)
engine := &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
ctx: ctx,
cancel: cancel,
signal: services.SignalClient,
signaler: peer.NewSignaler(services.SignalClient, config.WgPrivateKey),
mgmClient: services.MgmClient,
Expand Down Expand Up @@ -314,8 +324,34 @@
log.Debugf("tried stopping engine that is nil")
return nil
}
e.cancel()
e.syncMsgMux.Lock()

e.stopLocked()

e.syncMsgMux.Unlock()

timeout := e.calculateShutdownTimeout()
log.Debugf("waiting for goroutines to finish with timeout: %v", timeout)
shutdownCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := waitWithContext(shutdownCtx, &e.shutdownWg); err != nil {
log.Warnf("shutdown timeout exceeded after %v, some goroutines may still be running", timeout)
}

log.Infof("stopped Netbird Engine")

return nil
}

// stopLocked tears down everything Start may have brought up, in the order
// teardown requires (DNS before the interface goes down, flow manager after).
// The caller must hold syncMsgMux. It is shared by Stop and by Start's failure
// path, so a partially-initialized engine is cleaned up the same way; every
// step is nil-guarded. It does not wait on shutdownWg — the caller does that
// after releasing the lock, since the goroutines also take syncMsgMux.
func (e *Engine) stopLocked() {
if e.connMgr != nil {
e.connMgr.Close()
}
Expand Down Expand Up @@ -366,10 +402,6 @@
// so dbus and friends don't complain because of a missing interface
e.stopDNSServer()

if e.cancel != nil {
e.cancel()
}

e.jobExecutorWG.Wait() // block until job goroutines finish

e.close()
Expand All @@ -388,21 +420,6 @@
if err := e.stateManager.PersistState(context.Background()); err != nil {
log.Errorf("failed to persist state: %v", err)
}

e.syncMsgMux.Unlock()

timeout := e.calculateShutdownTimeout()
log.Debugf("waiting for goroutines to finish with timeout: %v", timeout)
shutdownCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := waitWithContext(shutdownCtx, &e.shutdownWg); err != nil {
log.Warnf("shutdown timeout exceeded after %v, some goroutines may still be running", timeout)
}

log.Infof("stopped Netbird Engine")

return nil
}

// calculateShutdownTimeout returns shutdown timeout: 10s base + 100ms per peer, capped at 30s.
Expand Down Expand Up @@ -440,18 +457,38 @@
// Start creates a new WireGuard tunnel interface and listens to events from Signal and Management services
// Connections to remote peers are not established here.
// However, they will be established once an event with a list of peers to connect to will be received from Management Service
func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) error {
func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) (err error) {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

if err := iface.ValidateMTU(e.config.MTU); err != nil {
return fmt.Errorf("invalid MTU configuration: %w", err)
// The engine is single-use. Reject a duplicate start and a start on an
// already-stopped engine (run context cancelled).
if e.started {
return ErrEngineAlreadyStarted
}

if ctxErr := e.ctx.Err(); ctxErr != nil {
return fmt.Errorf("engine already stopped: %w", ctxErr)
}

if e.cancel != nil {
e.cancel()
e.started = true

// Tear down any partially-initialized state on a failed start. Cancel the
// run context first so goroutines started before the failure (connMgr,
// srWatcher, monitors) unwind, then stopLocked mirrors Stop's teardown (we
// already hold syncMsgMux), cleaning up route/DNS/flow/state managers too,
// not just what close() covers.
defer func() {
if err != nil {
e.cancel()
e.stopLocked()
}
Comment thread
pappz marked this conversation as resolved.
}()

if err = iface.ValidateMTU(e.config.MTU); err != nil {
return fmt.Errorf("invalid MTU configuration: %w", err)
}
e.ctx, e.cancel = context.WithCancel(e.clientCtx)

e.exposeManager = expose.NewManager(e.ctx, e.mgmClient)

wgIface, err := e.newWgIface()
Expand Down Expand Up @@ -485,13 +522,11 @@

initialRoutes, dnsConfig, dnsFeatureFlag, err := e.readInitialSettings()
if err != nil {
e.close()
return fmt.Errorf("read initial settings: %w", err)
}

dnsServer, err := e.newDnsServer(dnsConfig)
if err != nil {
e.close()
return fmt.Errorf("create dns server: %w", err)
}
e.dnsServer = dnsServer
Expand Down Expand Up @@ -526,7 +561,6 @@

if err = e.wgInterfaceCreate(); err != nil {
log.Errorf("failed creating tunnel interface %s: [%s]", e.config.WgIfaceName, err.Error())
e.close()
return fmt.Errorf("create wg interface: %w", err)
}

Expand All @@ -535,7 +569,6 @@
}

if err := e.createFirewall(); err != nil {
e.close()
return err
}

Expand All @@ -547,7 +580,6 @@
e.udpMux, err = e.wgInterface.Up()
if err != nil {
log.Errorf("failed to pull up wgInterface [%s]: %s", e.wgInterface.Name(), err.Error())
e.close()
return fmt.Errorf("up wg interface: %w", err)
}

Expand All @@ -572,9 +604,7 @@
e.acl = acl.NewDefaultManager(e.firewall)
}

err = e.dnsServer.Initialize()
if err != nil {
e.close()
if err := e.dnsServer.Initialize(); err != nil {
return fmt.Errorf("initialize dns server: %w", err)
}

Expand All @@ -586,7 +616,9 @@
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
e.srWatcher.Start(peer.IsForceRelayed())

e.receiveSignalEvents()
if err = e.receiveSignalEvents(); err != nil {
return err
}
e.receiveManagementEvents()
e.receiveJobEvents()

Expand Down Expand Up @@ -638,7 +670,6 @@

func (e *Engine) initFirewall() error {
if err := e.routeManager.SetFirewall(e.firewall); err != nil {
e.close()
return fmt.Errorf("set firewall: %w", err)
}

Expand Down Expand Up @@ -1698,7 +1729,7 @@
}

// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
func (e *Engine) receiveSignalEvents() {
func (e *Engine) receiveSignalEvents() error {

Check failure on line 1732 in client/internal/engine.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 28 to the 20 allowed.

See more on https://sonarcloud.io/project/issues?id=netbirdio_netbird&issues=AZ7QiV6AHrNUCHtbHb2_&open=AZ7QiV6AHrNUCHtbHb2_&pullRequest=6443
e.shutdownWg.Add(1)
go func() {
defer e.shutdownWg.Done()
Expand Down Expand Up @@ -1762,7 +1793,12 @@
}
}()

e.signal.WaitStreamConnected()
// todo: consider to remove this blocker. I do not see benefit to block the Start operations
e.signal.WaitStreamConnected(e.ctx)
Comment thread
pappz marked this conversation as resolved.
if err := e.ctx.Err(); err != nil {
return fmt.Errorf("wait for signal stream: %w", err)
}
return nil
}

func (e *Engine) parseNATExternalIPMappings() []string {
Expand Down
Loading
Loading