diff --git a/core/node/status.go b/core/node/status.go index fdb2178d3..11b6219c4 100644 --- a/core/node/status.go +++ b/core/node/status.go @@ -18,6 +18,15 @@ type ( IsLeader bool `json:"is_leader"` IsOverloaded bool `json:"is_overloaded"` BootedAt time.Time `json:"booted_at"` + + // LeftAt is the last time the nmon advanced the last_shutdown file, + // i.e. the last time it proved it was alive and rejoined. + LeftAt time.Time `json:"left_at"` + + // RejoinedAt is the time nmon state transitioned from rejoin to idle. + // This happens either when the rejoin_grace_period expires or when + // we received data from all peers. + RejoinedAt time.Time `json:"rejoined_at"` } // Instances groups instances configuration digest and status diff --git a/core/object/text/kw/node/pool.drbd.max_peers b/core/object/text/kw/node/pool.drbd.max_peers new file mode 100644 index 000000000..b3c8af512 --- /dev/null +++ b/core/object/text/kw/node/pool.drbd.max_peers @@ -0,0 +1,3 @@ +The integer value to use in `create-md --max-peers `. + +The driver ensures the value is not lesser than the number of instances. diff --git a/core/object/text/kw/node/pool.drbd.max_peers.default b/core/object/text/kw/node/pool.drbd.max_peers.default new file mode 100644 index 000000000..6e85be761 --- /dev/null +++ b/core/object/text/kw/node/pool.drbd.max_peers.default @@ -0,0 +1 @@ +(nodes_count*2)-1 diff --git a/daemon/daemondata/daemon_hb.go b/daemon/daemondata/daemon_hb.go index 7b4209f3e..5fa6659d0 100644 --- a/daemon/daemondata/daemon_hb.go +++ b/daemon/daemondata/daemon_hb.go @@ -5,6 +5,7 @@ import ( "sort" "time" + "github.com/opensvc/om3/v3/core/clusternode" "github.com/opensvc/om3/v3/daemon/daemonsubsystem" "github.com/opensvc/om3/v3/daemon/hbcache" "github.com/opensvc/om3/v3/daemon/msgbus" @@ -70,7 +71,7 @@ func (d *data) setHbMsgType(node string, msgType string) { Node: node, From: previous, To: msgType, - Nodes: append([]string{}, d.clusterData.Cluster.Config.Nodes...), + Nodes: clusternode.Get(), JoinedNodes: joinedNodes, InstalledGens: d.deepCopyLocalGens(), }, pubsub.Label{"node", node}) diff --git a/daemon/daemondata/data_init.go b/daemon/daemondata/data_init.go index de11fe06a..774a25c2c 100644 --- a/daemon/daemondata/data_init.go +++ b/daemon/daemondata/data_init.go @@ -93,7 +93,7 @@ func newNodeData(localNode string) node.Node { Monitor: node.Monitor{ LocalExpect: node.MonitorLocalExpectNone, GlobalExpect: node.MonitorGlobalExpectNone, - State: node.MonitorStateInit, // this prevents imon orchestration + State: node.MonitorStateRejoin, // this prevents imon orchestration }, Stats: node.Stats{}, Status: node.Status{ diff --git a/daemon/daemondata/get_hb_msg_type.go b/daemon/daemondata/get_hb_msg_type.go index 29a472748..3b7280a97 100644 --- a/daemon/daemondata/get_hb_msg_type.go +++ b/daemon/daemondata/get_hb_msg_type.go @@ -3,6 +3,7 @@ package daemondata import ( "context" + "github.com/opensvc/om3/v3/core/clusternode" "github.com/opensvc/om3/v3/core/node" "github.com/opensvc/om3/v3/util/xmap" ) @@ -39,7 +40,7 @@ func (t T) GetHbMessageType() HbMessageType { func (o opGetHbMessageType) call(ctx context.Context, d *data) error { o.result <- HbMessageType{ Type: d.hbMessageType, - Nodes: d.clusterData.Cluster.Config.Nodes, + Nodes: clusternode.Get(), JoinedNodes: xmap.Keys(d.hbGens[d.localNode]), Gens: d.deepCopyLocalGens(), } diff --git a/daemon/daemondata/main_test.go b/daemon/daemondata/main_test.go index 84bb61c81..49aeaafde 100644 --- a/daemon/daemondata/main_test.go +++ b/daemon/daemondata/main_test.go @@ -137,7 +137,7 @@ func TestDaemonData(t *testing.T) { refreshed := bus.ClusterNodeData(localNode) assert.NotNil(t, refreshed) assert.Equal(t, uint64(1), refreshed.Status.Gen[localNode]) - assert.Equal(t, node.MonitorStateInit, refreshed.Monitor.State) + assert.Equal(t, node.MonitorStateRejoin, refreshed.Monitor.State) }) require.False(t, t.Failed()) // fail on first error @@ -154,7 +154,7 @@ func TestDaemonData(t *testing.T) { initial.GlobalExpectUpdatedAt = time.Now() refreshed := *node.MonitorData.GetByNode(localNode) - require.Equal(t, node.MonitorStateInit, refreshed.State, "State changed !") + require.Equal(t, node.MonitorStateRejoin, refreshed.State, "State changed !") require.Equal(t, initialUpdated, refreshed.StateUpdatedAt, "StateUpdated changed !") require.Equal(t, initialStateUpdated, refreshed.StateUpdatedAt, "StateUpdated changed !") require.Equal(t, node.MonitorGlobalExpectNone, refreshed.GlobalExpect, "GlobalExpect changed !") diff --git a/daemon/imon/main.go b/daemon/imon/main.go index 6786f2020..a5c37ee44 100644 --- a/daemon/imon/main.go +++ b/daemon/imon/main.go @@ -169,6 +169,11 @@ type ( // standbyResourceOrchestrate is the orchestrationResource for regular resources regularResourceOrchestrate orchestrationResource + + // isPeerFrozenMerged remembers we already mirrored locally a peer instance freeze + // that happened during this daemon last blackout (crash time => rejoin). + // i.e. this boolean shortcuts the t.mergePeerFrozen func. + isPeerFrozenMerged bool } // cmdOrchestrate can be used from post action go routines @@ -371,6 +376,7 @@ func (t *Manager) worker(initialNodes []string) { <-t.delayTimer.C } + t.mergePeerFrozen() t.initRelationAvailStatus() t.initResourceMonitor() t.initLocalResourceFiles() diff --git a/daemon/imon/main_cmd.go b/daemon/imon/main_cmd.go index 645a3d646..0b2b7706e 100644 --- a/daemon/imon/main_cmd.go +++ b/daemon/imon/main_cmd.go @@ -255,6 +255,7 @@ func (t *Manager) onMyInstanceStatusUpdated(srcNode string, srcCmd *msgbus.Insta return } t.instStatus[srcCmd.Node] = srcCmd.Value + t.mergePeerFrozen() t.clearStonith(srcCmd.Node, srcCmd.Value.Avail) t.handleResourceFiles(srcCmd) } @@ -1161,10 +1162,56 @@ func (t *Manager) initResourceMonitor() { t.change = true } -func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) { - if c.IsUpgrading { +func (t *Manager) mergePeerFrozen() { + if t.isPeerFrozenMerged { + return + } + leftAt := t.nodeStatus[t.localhost].LeftAt + rejoinedAt := t.nodeStatus[t.localhost].RejoinedAt + if leftAt.IsZero() || rejoinedAt.IsZero() { + // we will do that on NodeRejoin event + t.log.Tracef("not rejoined yet, defer merge peer frozen") return } + if len(t.instStatus) < 2 { + t.log.Tracef("not enough instances, delay merge peer frozen") + return + } + instStatus, ok := t.instStatus[t.localhost] + if !ok { + t.log.Tracef("local instance status is not evaluated yet, defer merge peer frozen") + return + } + if !instStatus.FrozenAt.IsZero() { + t.log.Tracef("local instance is already frozen, skip merge peer frozen") + return + } + if t.state.GlobalExpect == instance.MonitorGlobalExpectUnfrozen { + t.log.Tracef("global expect is unfrozen, skip merge peer frozen") + return + } + if t.instConfig.ActorConfig != nil && t.instConfig.ActorConfig.Orchestrate != "ha" { + t.log.Tracef("object is not a ha actor, skip merge peer frozen") + return + } + for peer, peerStatus := range t.instStatus { + if peer == t.localhost { + continue + } + if peerStatus.FrozenAt.After(leftAt) && peerStatus.FrozenAt.Before(rejoinedAt) { + msg := fmt.Sprintf("freeze %s instance because peer %s instance was frozen while this daemon was down", t.path, peer) + if err := t.queueFreeze(); err != nil { + t.log.Errorf("%s: %s", msg, err) + } else { + t.isPeerFrozenMerged = true + t.log.Infof(msg) + } + return + } + } +} + +func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) { if len(t.instStatus) < 2 { // no need to merge frozen if the object has a single instance return @@ -1190,7 +1237,7 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) { if peerStatus.FrozenAt.After(c.LastShutdownAt) { msg := fmt.Sprintf("Freeze %s instance because peer %s instance was frozen while this daemon was down", t.path, peer) if err := t.queueFreeze(); err != nil { - t.log.Infof("%s: %s", msg, err) + t.log.Errorf("%s: %s", msg, err) } else { t.log.Infof(msg) } diff --git a/daemon/nmon/main.go b/daemon/nmon/main.go index 9132f2440..02f992ab9 100644 --- a/daemon/nmon/main.go +++ b/daemon/nmon/main.go @@ -170,12 +170,12 @@ func NewManager(drainDuration time.Duration, subQS pubsub.QueueSizer) *Manager { state: node.Monitor{ LocalExpect: node.MonitorLocalExpectNone, GlobalExpect: node.MonitorGlobalExpectNone, - State: node.MonitorStateInit, // this prevents imon orchestration + State: node.MonitorStateRejoin, // this prevents imon orchestration }, previousState: node.Monitor{ LocalExpect: node.MonitorLocalExpectNone, GlobalExpect: node.MonitorGlobalExpectNone, - State: node.MonitorStateInit, + State: node.MonitorStateRejoin, }, cmdC: make(chan any), poolC: make(chan any, 1), @@ -268,6 +268,11 @@ func (t *Manager) Start(parent context.Context) error { t.setArbitratorConfig() t.startSubscriptions() + + if clusterConfig := cluster.ConfigData.Get(); clusterConfig != nil { + t.clusterConfig = *clusterConfig + } + t.wg.Add(1) go func() { defer t.wg.Done() @@ -359,9 +364,19 @@ func (t *Manager) startRejoin() { l := missingNodes(hbMessageType.Nodes, hbMessageType.JoinedNodes) if (hbMessageType.Type == "patch") && len(l) == 0 { // Skip the rejoin state phase. + t.log.Infof("we are late to the party, immediate rejoin") + leftAt := file.ModTime(rawconfig.Paths.LastShutdown) t.rejoinTicker = time.NewTicker(time.Second) t.rejoinTicker.Stop() + t.nodeStatus.LeftAt = leftAt + t.nodeStatus.RejoinedAt = time.Now() + _ = os.Unsetenv("OPENSVC_AGENT_UPGRADE") t.transitionTo(node.MonitorStateIdle) + t.publisher.Pub(&msgbus.NodeRejoin{ + Nodes: hbMessageType.Nodes, + LastShutdownAt: leftAt, + IsUpgrading: os.Getenv("OPENSVC_AGENT_UPGRADE") != "", + }, t.labelLocalhost) } else { // Begin the rejoin state phase. // Arm the re-join grace period ticker. @@ -376,9 +391,9 @@ func (t *Manager) startRejoin() { func (t *Manager) touchLastShutdown() { // remember the last shutdown date via a file mtime if err := file.Touch(rawconfig.Paths.LastShutdown, time.Now()); err != nil { - t.log.Errorf("touch %s: %s", rawconfig.Paths.LastShutdown, err) + t.log.Warnf("touch %s: %s", rawconfig.Paths.LastShutdown, err) } else { - t.log.Infof("touch %s", rawconfig.Paths.LastShutdown) + t.log.Tracef("touch %s", rawconfig.Paths.LastShutdown) } } @@ -431,10 +446,16 @@ func (t *Manager) worker() { statsTicker := time.NewTicker(statsInterval) defer statsTicker.Stop() + arbitratorTicker := time.NewTicker(arbitratorInterval) defer arbitratorTicker.Stop() defer t.touchLastShutdown() + // lastShutdownFileTouchTicker is used to periodically touch LastShutdown file + // when the daemon is in a state where rejoinTicker has been stopped + lastShutdownFileTouchTicker := time.NewTicker(10 * time.Second) + defer lastShutdownFileTouchTicker.Stop() + // TODO refreshSanPaths should be refreshed on events, on ticker ? for { select { @@ -496,29 +517,48 @@ func (t *Manager) worker() { t.onArbitratorTicker() case <-t.rejoinTicker.C: t.onRejoinGracePeriodExpire() + case <-lastShutdownFileTouchTicker.C: + t.onLastShutdownFileTouchTicker() } } } -func (t *Manager) onRejoinGracePeriodExpire() { +func (t *Manager) onLastShutdownFileTouchTicker() { + if t.state.State == node.MonitorStateRejoin { + return + } + t.touchLastShutdown() +} + +func (t *Manager) nodeFreeze() (bool, error) { nodeFrozenFile := filepath.Join(rawconfig.Paths.Var, "node", "frozen") - frozen := file.ModTime(nodeFrozenFile) - if frozen.Equal(time.Time{}) { - f, err := os.OpenFile(nodeFrozenFile, os.O_RDONLY|os.O_CREATE, 0666) + frozenAt := file.ModTime(nodeFrozenFile) + if !frozenAt.IsZero() { + return false, nil + } + f, err := os.OpenFile(nodeFrozenFile, os.O_RDONLY|os.O_CREATE, 0666) + if err != nil { + return false, err + } + if err := f.Close(); err != nil { + return true, err + } + return true, nil +} + +func (t *Manager) onRejoinGracePeriodExpire() { + changed, err := t.nodeFreeze() + msg := "rejoin grace period expired" + if changed { if err != nil { - t.log.Errorf("rejoin grace period expired: freeze node: %s", err) - t.rejoinTicker.Reset(2 * time.Second) - return - } - t.log.Infof("rejoin grace period expired: freeze node") - if err := f.Close(); err != nil { - t.log.Errorf("rejoin grace period expired: freeze node: %s", err) + t.log.Errorf("%s: freezing local node: %s", msg, err) t.rejoinTicker.Reset(2 * time.Second) return } + t.log.Infof("%s: local node has been frozen", msg) t.transitionTo(node.MonitorStateIdle) } else { - t.log.Infof("rejoin grace period expired: the node is already frozen") + t.log.Infof("%s: local node is already frozen", msg) t.transitionTo(node.MonitorStateIdle) } t.rejoinTicker.Stop() diff --git a/daemon/nmon/main_cmd.go b/daemon/nmon/main_cmd.go index 9b2284321..fe06c1cfb 100644 --- a/daemon/nmon/main_cmd.go +++ b/daemon/nmon/main_cmd.go @@ -340,11 +340,14 @@ func (t *Manager) onHeartbeatMessageTypeUpdated(c *msgbus.HeartbeatMessageTypeUp return } t.rejoinTicker.Stop() + leftAt := file.ModTime(rawconfig.Paths.LastShutdown) t.publisher.Pub(&msgbus.NodeRejoin{ Nodes: c.Nodes, - LastShutdownAt: file.ModTime(rawconfig.Paths.LastShutdown), + LastShutdownAt: leftAt, IsUpgrading: os.Getenv("OPENSVC_AGENT_UPGRADE") != "", }, t.labelLocalhost) + t.nodeStatus.LeftAt = leftAt + t.nodeStatus.RejoinedAt = time.Now() _ = os.Unsetenv("OPENSVC_AGENT_UPGRADE") t.transitionTo(node.MonitorStateIdle) } @@ -357,13 +360,10 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) { // no need to merge frozen on a single node cluster return } - if !t.nodeStatus.FrozenAt.IsZero() { - // already frozen - return - } if t.state.GlobalExpect == node.MonitorGlobalExpectUnfrozen { return } + var frozenPeers []string for _, peer := range t.clusterConfig.Nodes { if peer == t.localhost { continue @@ -373,14 +373,29 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) { continue } if peerStatus.FrozenAt.After(c.LastShutdownAt) { - if err := t.crmFreeze(); err != nil { - t.log.Infof("node freeze error: %s", err) - } else { - t.log.Infof("node freeze because peer %s was frozen while this daemon was down", peer) - } - return + frozenPeers = append(frozenPeers, peer) } } + + var msg string + switch len(frozenPeers) { + case 0: + // no peer frozen while this daemon was shutdown + return + case 1: + msg = fmt.Sprintf("peer %s was frozen while this daemon was down", frozenPeers[0]) + default: + msg = fmt.Sprintf("peers %s were frozen while this daemon was down", frozenPeers) + } + + changed, err := t.nodeFreeze() + if err != nil { + t.log.Errorf("%s: freezing local node: %s", msg, err) + } else if changed { + t.log.Infof("%s: local node has been frozen", msg) + } else { + t.log.Infof("%s: local node is already frozen", msg) + } } func (t *Manager) onOrchestrate(c cmdOrchestrate) { diff --git a/util/logging/logging.go b/util/logging/logging.go index 658273c21..95b4ddef4 100644 --- a/util/logging/logging.go +++ b/util/logging/logging.go @@ -186,7 +186,7 @@ func Configure(config Config) error { if config.WithCaller { // skip one more for plog wrappers - logger = logger.With().CallerWithSkipFrameCount(zerolog.CallerSkipFrameCount + 1).Logger() + logger = logger.With().CallerWithSkipFrameCount(zerolog.CallerSkipFrameCount + 2).Logger() } log.Logger = logger diff --git a/util/logreader/main.go b/util/logreader/main.go index 007dfb20d..ce10861fe 100644 --- a/util/logreader/main.go +++ b/util/logreader/main.go @@ -314,33 +314,45 @@ func renderEvent(e streamlog.Event, node string, streamIndex int, numStreams int w.FormatFieldValue = func(i any) string { return "" } // Determine prefix based on stream index - prefix := "" - if node != "" { - prefixes := []string{"⣇", "⣸"} - runePosition := streamIndex / 2 - paddingWidth := (numStreams)/2 + 1 - - var terminator string - - // test if last rune is not filled with streams - if numStreams%2 != 0 && streamIndex == numStreams-1 { - prefix = "⡇" + var prefix string + if node != "" && numStreams > 1 { + // Format a prefix helping the reader grap wich node emited the entry: + // + // ⣇⡀ node1: foo + // ⣸⡀ node2: foo + // ⣀⡇ node3: foo + // + runes := []rune("⣇⣸⣀⡀⡇") + runeA := runes[0] + runeB := runes[1] + runeZ := runes[2] + runeHalfZ := runes[3] + runeHalfA := runes[4] + + streamPosition := streamIndex / 2 + paddedWidth := (numStreams + 1) / 2 + evenStreamCount := numStreams%2 == 0 + evenStreamIndex := streamIndex%2 == 0 + prefix = strings.Repeat("⣀", paddedWidth) + runeset := []rune(prefix) + + if evenStreamIndex { + runeset[streamPosition] = runeA } else { - prefix = prefixes[streamIndex%2] - if numStreams%2 != 0 { - terminator = "⡀" - } else { - terminator = "⣀" + runeset[streamPosition] = runeB + } + + if !evenStreamCount { + switch runeset[paddedWidth-1] { + case runeA: + runeset[paddedWidth-1] = runeHalfA + case runeZ: + runeset[paddedWidth-1] = runeHalfZ } } - // Pad prefix to the calculated width - prefix = strings.Repeat("⣀", runePosition) + prefix + prefix = string(runeset) - if n := paddingWidth - runePosition - 2; n > 0 { - prefix += strings.Repeat("⣀", paddingWidth-runePosition-2) - } - prefix += terminator } w.FormatMessage = func(i any) string {