Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/node/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ type (
IsLeader bool `json:"is_leader"`
IsOverloaded bool `json:"is_overloaded"`
BootedAt time.Time `json:"booted_at"`

// LeftAt is the last time the nmon advanced the last_shutdown file,
// i.e. the last time it proved it was alive and rejoined.
LeftAt time.Time `json:"left_at"`

// RejoinedAt is the time nmon state transitioned from rejoin to idle.
// This happens either when the rejoin_grace_period expires or when
// we received data from all peers.
RejoinedAt time.Time `json:"rejoined_at"`
}

// Instances groups instances configuration digest and status
Expand Down
3 changes: 3 additions & 0 deletions core/object/text/kw/node/pool.drbd.max_peers
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The integer value to use in `create-md --max-peers <n>`.

The driver ensures the value is not lesser than the number of instances.
1 change: 1 addition & 0 deletions core/object/text/kw/node/pool.drbd.max_peers.default
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(nodes_count*2)-1
3 changes: 2 additions & 1 deletion daemon/daemondata/daemon_hb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"time"

"github.com/opensvc/om3/v3/core/clusternode"
"github.com/opensvc/om3/v3/daemon/daemonsubsystem"
"github.com/opensvc/om3/v3/daemon/hbcache"
"github.com/opensvc/om3/v3/daemon/msgbus"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (d *data) setHbMsgType(node string, msgType string) {
Node: node,
From: previous,
To: msgType,
Nodes: append([]string{}, d.clusterData.Cluster.Config.Nodes...),
Nodes: clusternode.Get(),
JoinedNodes: joinedNodes,
InstalledGens: d.deepCopyLocalGens(),
}, pubsub.Label{"node", node})
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemondata/data_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newNodeData(localNode string) node.Node {
Monitor: node.Monitor{
LocalExpect: node.MonitorLocalExpectNone,
GlobalExpect: node.MonitorGlobalExpectNone,
State: node.MonitorStateInit, // this prevents imon orchestration
State: node.MonitorStateRejoin, // this prevents imon orchestration
},
Stats: node.Stats{},
Status: node.Status{
Expand Down
3 changes: 2 additions & 1 deletion daemon/daemondata/get_hb_msg_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemondata
import (
"context"

"github.com/opensvc/om3/v3/core/clusternode"
"github.com/opensvc/om3/v3/core/node"
"github.com/opensvc/om3/v3/util/xmap"
)
Expand Down Expand Up @@ -39,7 +40,7 @@ func (t T) GetHbMessageType() HbMessageType {
func (o opGetHbMessageType) call(ctx context.Context, d *data) error {
o.result <- HbMessageType{
Type: d.hbMessageType,
Nodes: d.clusterData.Cluster.Config.Nodes,
Nodes: clusternode.Get(),
JoinedNodes: xmap.Keys(d.hbGens[d.localNode]),
Gens: d.deepCopyLocalGens(),
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/daemondata/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestDaemonData(t *testing.T) {
refreshed := bus.ClusterNodeData(localNode)
assert.NotNil(t, refreshed)
assert.Equal(t, uint64(1), refreshed.Status.Gen[localNode])
assert.Equal(t, node.MonitorStateInit, refreshed.Monitor.State)
assert.Equal(t, node.MonitorStateRejoin, refreshed.Monitor.State)
})
require.False(t, t.Failed()) // fail on first error

Expand All @@ -154,7 +154,7 @@ func TestDaemonData(t *testing.T) {
initial.GlobalExpectUpdatedAt = time.Now()

refreshed := *node.MonitorData.GetByNode(localNode)
require.Equal(t, node.MonitorStateInit, refreshed.State, "State changed !")
require.Equal(t, node.MonitorStateRejoin, refreshed.State, "State changed !")
require.Equal(t, initialUpdated, refreshed.StateUpdatedAt, "StateUpdated changed !")
require.Equal(t, initialStateUpdated, refreshed.StateUpdatedAt, "StateUpdated changed !")
require.Equal(t, node.MonitorGlobalExpectNone, refreshed.GlobalExpect, "GlobalExpect changed !")
Expand Down
6 changes: 6 additions & 0 deletions daemon/imon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ type (

// standbyResourceOrchestrate is the orchestrationResource for regular resources
regularResourceOrchestrate orchestrationResource

// isPeerFrozenMerged remembers we already mirrored locally a peer instance freeze
// that happened during this daemon last blackout (crash time => rejoin).
// i.e. this boolean shortcuts the t.mergePeerFrozen func.
isPeerFrozenMerged bool
}

// cmdOrchestrate can be used from post action go routines
Expand Down Expand Up @@ -371,6 +376,7 @@ func (t *Manager) worker(initialNodes []string) {
<-t.delayTimer.C
}

t.mergePeerFrozen()
t.initRelationAvailStatus()
t.initResourceMonitor()
t.initLocalResourceFiles()
Expand Down
53 changes: 50 additions & 3 deletions daemon/imon/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (t *Manager) onMyInstanceStatusUpdated(srcNode string, srcCmd *msgbus.Insta
return
}
t.instStatus[srcCmd.Node] = srcCmd.Value
t.mergePeerFrozen()
t.clearStonith(srcCmd.Node, srcCmd.Value.Avail)
t.handleResourceFiles(srcCmd)
}
Expand Down Expand Up @@ -1161,10 +1162,56 @@ func (t *Manager) initResourceMonitor() {
t.change = true
}

func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) {
if c.IsUpgrading {
func (t *Manager) mergePeerFrozen() {
if t.isPeerFrozenMerged {
return
}
leftAt := t.nodeStatus[t.localhost].LeftAt
rejoinedAt := t.nodeStatus[t.localhost].RejoinedAt
if leftAt.IsZero() || rejoinedAt.IsZero() {
// we will do that on NodeRejoin event
t.log.Tracef("not rejoined yet, defer merge peer frozen")
return
}
if len(t.instStatus) < 2 {
t.log.Tracef("not enough instances, delay merge peer frozen")
return
}
instStatus, ok := t.instStatus[t.localhost]
if !ok {
t.log.Tracef("local instance status is not evaluated yet, defer merge peer frozen")
return
}
if !instStatus.FrozenAt.IsZero() {
t.log.Tracef("local instance is already frozen, skip merge peer frozen")
return
}
if t.state.GlobalExpect == instance.MonitorGlobalExpectUnfrozen {
t.log.Tracef("global expect is unfrozen, skip merge peer frozen")
return
}
if t.instConfig.ActorConfig != nil && t.instConfig.ActorConfig.Orchestrate != "ha" {
t.log.Tracef("object is not a ha actor, skip merge peer frozen")
return
}
for peer, peerStatus := range t.instStatus {
if peer == t.localhost {
continue
}
if peerStatus.FrozenAt.After(leftAt) && peerStatus.FrozenAt.Before(rejoinedAt) {
msg := fmt.Sprintf("freeze %s instance because peer %s instance was frozen while this daemon was down", t.path, peer)
if err := t.queueFreeze(); err != nil {
t.log.Errorf("%s: %s", msg, err)
} else {
t.isPeerFrozenMerged = true
t.log.Infof(msg)
}
return
}
}
}

func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) {
if len(t.instStatus) < 2 {
// no need to merge frozen if the object has a single instance
return
Expand All @@ -1190,7 +1237,7 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) {
if peerStatus.FrozenAt.After(c.LastShutdownAt) {
msg := fmt.Sprintf("Freeze %s instance because peer %s instance was frozen while this daemon was down", t.path, peer)
if err := t.queueFreeze(); err != nil {
t.log.Infof("%s: %s", msg, err)
t.log.Errorf("%s: %s", msg, err)
} else {
t.log.Infof(msg)
}
Expand Down
72 changes: 56 additions & 16 deletions daemon/nmon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ func NewManager(drainDuration time.Duration, subQS pubsub.QueueSizer) *Manager {
state: node.Monitor{
LocalExpect: node.MonitorLocalExpectNone,
GlobalExpect: node.MonitorGlobalExpectNone,
State: node.MonitorStateInit, // this prevents imon orchestration
State: node.MonitorStateRejoin, // this prevents imon orchestration
},
previousState: node.Monitor{
LocalExpect: node.MonitorLocalExpectNone,
GlobalExpect: node.MonitorGlobalExpectNone,
State: node.MonitorStateInit,
State: node.MonitorStateRejoin,
},
cmdC: make(chan any),
poolC: make(chan any, 1),
Expand Down Expand Up @@ -268,6 +268,11 @@ func (t *Manager) Start(parent context.Context) error {
t.setArbitratorConfig()

t.startSubscriptions()

if clusterConfig := cluster.ConfigData.Get(); clusterConfig != nil {
t.clusterConfig = *clusterConfig
}

t.wg.Add(1)
go func() {
defer t.wg.Done()
Expand Down Expand Up @@ -359,9 +364,19 @@ func (t *Manager) startRejoin() {
l := missingNodes(hbMessageType.Nodes, hbMessageType.JoinedNodes)
if (hbMessageType.Type == "patch") && len(l) == 0 {
// Skip the rejoin state phase.
t.log.Infof("we are late to the party, immediate rejoin")
leftAt := file.ModTime(rawconfig.Paths.LastShutdown)
t.rejoinTicker = time.NewTicker(time.Second)
t.rejoinTicker.Stop()
t.nodeStatus.LeftAt = leftAt
t.nodeStatus.RejoinedAt = time.Now()
_ = os.Unsetenv("OPENSVC_AGENT_UPGRADE")
t.transitionTo(node.MonitorStateIdle)
t.publisher.Pub(&msgbus.NodeRejoin{
Nodes: hbMessageType.Nodes,
LastShutdownAt: leftAt,
IsUpgrading: os.Getenv("OPENSVC_AGENT_UPGRADE") != "",
}, t.labelLocalhost)
} else {
// Begin the rejoin state phase.
// Arm the re-join grace period ticker.
Expand All @@ -376,9 +391,9 @@ func (t *Manager) startRejoin() {
func (t *Manager) touchLastShutdown() {
// remember the last shutdown date via a file mtime
if err := file.Touch(rawconfig.Paths.LastShutdown, time.Now()); err != nil {
t.log.Errorf("touch %s: %s", rawconfig.Paths.LastShutdown, err)
t.log.Warnf("touch %s: %s", rawconfig.Paths.LastShutdown, err)
} else {
t.log.Infof("touch %s", rawconfig.Paths.LastShutdown)
t.log.Tracef("touch %s", rawconfig.Paths.LastShutdown)
}
}

Expand Down Expand Up @@ -431,10 +446,16 @@ func (t *Manager) worker() {

statsTicker := time.NewTicker(statsInterval)
defer statsTicker.Stop()

arbitratorTicker := time.NewTicker(arbitratorInterval)
defer arbitratorTicker.Stop()
defer t.touchLastShutdown()

// lastShutdownFileTouchTicker is used to periodically touch LastShutdown file
// when the daemon is in a state where rejoinTicker has been stopped
lastShutdownFileTouchTicker := time.NewTicker(10 * time.Second)
defer lastShutdownFileTouchTicker.Stop()

// TODO refreshSanPaths should be refreshed on events, on ticker ?
for {
select {
Expand Down Expand Up @@ -496,29 +517,48 @@ func (t *Manager) worker() {
t.onArbitratorTicker()
case <-t.rejoinTicker.C:
t.onRejoinGracePeriodExpire()
case <-lastShutdownFileTouchTicker.C:
t.onLastShutdownFileTouchTicker()
}
}
}

func (t *Manager) onRejoinGracePeriodExpire() {
func (t *Manager) onLastShutdownFileTouchTicker() {
if t.state.State == node.MonitorStateRejoin {
return
}
t.touchLastShutdown()
}

func (t *Manager) nodeFreeze() (bool, error) {
nodeFrozenFile := filepath.Join(rawconfig.Paths.Var, "node", "frozen")
frozen := file.ModTime(nodeFrozenFile)
if frozen.Equal(time.Time{}) {
f, err := os.OpenFile(nodeFrozenFile, os.O_RDONLY|os.O_CREATE, 0666)
frozenAt := file.ModTime(nodeFrozenFile)
if !frozenAt.IsZero() {
return false, nil
}
f, err := os.OpenFile(nodeFrozenFile, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
return false, err
}
if err := f.Close(); err != nil {
return true, err
}
return true, nil
}

func (t *Manager) onRejoinGracePeriodExpire() {
changed, err := t.nodeFreeze()
msg := "rejoin grace period expired"
if changed {
if err != nil {
t.log.Errorf("rejoin grace period expired: freeze node: %s", err)
t.rejoinTicker.Reset(2 * time.Second)
return
}
t.log.Infof("rejoin grace period expired: freeze node")
if err := f.Close(); err != nil {
t.log.Errorf("rejoin grace period expired: freeze node: %s", err)
t.log.Errorf("%s: freezing local node: %s", msg, err)
t.rejoinTicker.Reset(2 * time.Second)
return
}
t.log.Infof("%s: local node has been frozen", msg)
t.transitionTo(node.MonitorStateIdle)
} else {
t.log.Infof("rejoin grace period expired: the node is already frozen")
t.log.Infof("%s: local node is already frozen", msg)
t.transitionTo(node.MonitorStateIdle)
}
t.rejoinTicker.Stop()
Expand Down
37 changes: 26 additions & 11 deletions daemon/nmon/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,14 @@ func (t *Manager) onHeartbeatMessageTypeUpdated(c *msgbus.HeartbeatMessageTypeUp
return
}
t.rejoinTicker.Stop()
leftAt := file.ModTime(rawconfig.Paths.LastShutdown)
t.publisher.Pub(&msgbus.NodeRejoin{
Nodes: c.Nodes,
LastShutdownAt: file.ModTime(rawconfig.Paths.LastShutdown),
LastShutdownAt: leftAt,
IsUpgrading: os.Getenv("OPENSVC_AGENT_UPGRADE") != "",
}, t.labelLocalhost)
t.nodeStatus.LeftAt = leftAt
t.nodeStatus.RejoinedAt = time.Now()
_ = os.Unsetenv("OPENSVC_AGENT_UPGRADE")
t.transitionTo(node.MonitorStateIdle)
}
Expand All @@ -357,13 +360,10 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) {
// no need to merge frozen on a single node cluster
return
}
if !t.nodeStatus.FrozenAt.IsZero() {
// already frozen
return
}
if t.state.GlobalExpect == node.MonitorGlobalExpectUnfrozen {
return
}
var frozenPeers []string
for _, peer := range t.clusterConfig.Nodes {
if peer == t.localhost {
continue
Expand All @@ -373,14 +373,29 @@ func (t *Manager) onNodeRejoin(c *msgbus.NodeRejoin) {
continue
}
if peerStatus.FrozenAt.After(c.LastShutdownAt) {
if err := t.crmFreeze(); err != nil {
t.log.Infof("node freeze error: %s", err)
} else {
t.log.Infof("node freeze because peer %s was frozen while this daemon was down", peer)
}
return
frozenPeers = append(frozenPeers, peer)
}
}

var msg string
switch len(frozenPeers) {
case 0:
// no peer frozen while this daemon was shutdown
return
case 1:
msg = fmt.Sprintf("peer %s was frozen while this daemon was down", frozenPeers[0])
default:
msg = fmt.Sprintf("peers %s were frozen while this daemon was down", frozenPeers)
}

changed, err := t.nodeFreeze()
if err != nil {
t.log.Errorf("%s: freezing local node: %s", msg, err)
} else if changed {
t.log.Infof("%s: local node has been frozen", msg)
} else {
t.log.Infof("%s: local node is already frozen", msg)
}
}

func (t *Manager) onOrchestrate(c cmdOrchestrate) {
Expand Down
2 changes: 1 addition & 1 deletion util/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Configure(config Config) error {

if config.WithCaller {
// skip one more for plog wrappers
logger = logger.With().CallerWithSkipFrameCount(zerolog.CallerSkipFrameCount + 1).Logger()
logger = logger.With().CallerWithSkipFrameCount(zerolog.CallerSkipFrameCount + 2).Logger()
}

log.Logger = logger
Expand Down
Loading
Loading