Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] proxy: add heartbeat to hakeeper to support upload configs. #11993

Merged
merged 3 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/hakeeper/checkers/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/cnservice"
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/dnservice"
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/logservice"
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/proxy"
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/syshealth"
"github.com/matrixorigin/matrixone/pkg/hakeeper/checkers/util"
"github.com/matrixorigin/matrixone/pkg/hakeeper/operator"
Expand Down Expand Up @@ -51,6 +52,7 @@ func (c *Coordinator) Check(alloc util.IDAllocator, state pb.CheckerState) []pb.
logState := state.LogState
tnState := state.TNState
cnState := state.CNState
proxyState := state.ProxyState
cluster := state.ClusterInfo
currentTick := state.Tick
user := state.TaskTableUser
Expand All @@ -68,18 +70,18 @@ func (c *Coordinator) Check(alloc util.IDAllocator, state pb.CheckerState) []pb.
}
}()

c.OperatorController.RemoveFinishedOperator(logState, tnState, cnState)
c.OperatorController.RemoveFinishedOperator(logState, tnState, cnState, proxyState)

// if we've discovered unhealthy already, no need to keep alive anymore.
if c.teardown {
return c.OperatorController.Dispatch(c.teardownOps, logState, tnState, cnState)
return c.OperatorController.Dispatch(c.teardownOps, logState, tnState, cnState, proxyState)
}

// check whether system health or not.
if operators, health := syshealth.Check(c.cfg, cluster, tnState, logState, currentTick); !health {
c.teardown = true
c.teardownOps = operators
return c.OperatorController.Dispatch(c.teardownOps, logState, tnState, cnState)
return c.OperatorController.Dispatch(c.teardownOps, logState, tnState, cnState, proxyState)
}

// system health, try to keep alive.
Expand All @@ -89,6 +91,7 @@ func (c *Coordinator) Check(alloc util.IDAllocator, state pb.CheckerState) []pb.
operators = append(operators, logservice.Check(alloc, c.cfg, cluster, logState, executing, user, currentTick)...)
operators = append(operators, dnservice.Check(alloc, c.cfg, cluster, tnState, user, currentTick)...)
operators = append(operators, cnservice.Check(c.cfg, cnState, user, currentTick)...)
operators = append(operators, proxy.Check(c.cfg, proxyState, currentTick)...)

return c.OperatorController.Dispatch(operators, logState, tnState, cnState)
return c.OperatorController.Dispatch(operators, logState, tnState, cnState, proxyState)
}
43 changes: 43 additions & 0 deletions pkg/hakeeper/checkers/proxy/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 - 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
"github.com/matrixorigin/matrixone/pkg/hakeeper"
"github.com/matrixorigin/matrixone/pkg/hakeeper/operator"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
)

func Check(cfg hakeeper.Config, infos pb.ProxyState, currentTick uint64) (operators []*operator.Operator) {
_, expired := parseProxyStores(cfg, infos, currentTick)
for _, uuid := range expired {
operators = append(operators, operator.CreateDeleteProxyOp("", uuid))
}
return operators
}

// parseProxyStores returns all working and expired stores' ids.
func parseProxyStores(cfg hakeeper.Config, infos pb.ProxyState, currentTick uint64) ([]string, []string) {
working := make([]string, 0)
expired := make([]string, 0)
for uuid, storeInfo := range infos.Stores {
if cfg.ProxyStoreExpired(storeInfo.Tick, currentTick) {
expired = append(expired, uuid)
} else {
working = append(working, uuid)
}
}
return working, expired
}
101 changes: 101 additions & 0 deletions pkg/hakeeper/checkers/proxy/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 - 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/hakeeper"
"github.com/matrixorigin/matrixone/pkg/hakeeper/operator"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/stretchr/testify/assert"
)

var expiredTick = uint64(hakeeper.DefaultProxyStoreTimeout / time.Second * hakeeper.DefaultTickPerSecond)

func TestParseProxyStores(t *testing.T) {
cases := []struct {
infos pb.ProxyState
currentTick uint64

working []string
expired []string
}{
{
infos: pb.ProxyState{
Stores: map[string]pb.ProxyStore{"a": {
Tick: 0,
}},
},
currentTick: 0,
working: []string{"a"},
expired: []string{},
},
{
infos: pb.ProxyState{
Stores: map[string]pb.ProxyStore{"a": {
Tick: 0,
}},
},
currentTick: expiredTick + 1,
working: []string{},
expired: []string{"a"},
},
{
infos: pb.ProxyState{
Stores: map[string]pb.ProxyStore{
"a": {
Tick: expiredTick,
},
"b": {
Tick: 0,
},
},
},
currentTick: expiredTick + 1,
working: []string{"a"},
expired: []string{"b"},
},
}

cfg := hakeeper.Config{}
cfg.Fill()

for _, c := range cases {
working, expired := parseProxyStores(cfg, c.infos, c.currentTick)
assert.Equal(t, c.working, working)
assert.Equal(t, c.expired, expired)
}
}

func TestCheck(t *testing.T) {
infos := pb.ProxyState{
Stores: map[string]pb.ProxyStore{"a": {
Tick: 0,
}},
}
currentTick := expiredTick + 1
cfg := hakeeper.Config{}
cfg.Fill()
ops := Check(cfg, infos, currentTick)
assert.Equal(t, 1, len(ops))
steps := ops[0].OpSteps()
assert.Equal(t, 1, len(steps))
s, ok := steps[0].(operator.DeleteProxyStore)
assert.True(t, ok)
assert.NotNil(t, s)
assert.Equal(t, "a", s.StoreID)
}
21 changes: 17 additions & 4 deletions pkg/hakeeper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
)

const (
DefaultTickPerSecond = 10
DefaultLogStoreTimeout = 5 * time.Minute
DefaultTNStoreTimeout = 10 * time.Second
DefaultCNStoreTimeout = 30 * time.Second
DefaultTickPerSecond = 10
DefaultLogStoreTimeout = 5 * time.Minute
DefaultTNStoreTimeout = 10 * time.Second
DefaultCNStoreTimeout = 30 * time.Second
DefaultProxyStoreTimeout = 30 * time.Second
)

type Config struct {
Expand All @@ -45,6 +46,11 @@ type Config struct {
// If HAKeeper does not receive two heartbeat within CNStoreTimeout,
// it regards the tn store as down.
CNStoreTimeout time.Duration

// ProxyStoreTimeout is the actual time limit between a proxy store's heartbeat.
// If HAKeeper does not receive two heartbeat within ProxyStoreTimeout,
// it regards the proxy store as down.
ProxyStoreTimeout time.Duration
}

func (cfg Config) Validate() error {
Expand All @@ -64,6 +70,9 @@ func (cfg *Config) Fill() {
if cfg.CNStoreTimeout == 0 {
cfg.CNStoreTimeout = DefaultCNStoreTimeout
}
if cfg.ProxyStoreTimeout == 0 {
cfg.ProxyStoreTimeout = DefaultProxyStoreTimeout
}
}

func (cfg Config) LogStoreExpired(start, current uint64) bool {
Expand All @@ -78,6 +87,10 @@ func (cfg Config) CNStoreExpired(start, current uint64) bool {
return uint64(int(cfg.CNStoreTimeout/time.Second)*cfg.TickPerSecond)+start < current
}

func (cfg Config) ProxyStoreExpired(start, current uint64) bool {
return uint64(int(cfg.ProxyStoreTimeout/time.Second)*cfg.TickPerSecond)+start < current
}

func (cfg Config) ExpiredTick(start uint64, timeout time.Duration) uint64 {
return uint64(timeout/time.Second)*uint64(cfg.TickPerSecond) + start
}
22 changes: 18 additions & 4 deletions pkg/hakeeper/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ func (c *Controller) GetExecutingReplicas() ExecutingReplicas {
return executing
}

func (c *Controller) RemoveFinishedOperator(logState pb.LogState, tnState pb.TNState, cnState pb.CNState) {
func (c *Controller) RemoveFinishedOperator(
logState pb.LogState, tnState pb.TNState, cnState pb.CNState, proxyState pb.ProxyState,
) {
for _, ops := range c.operators {
for _, op := range ops {
op.Check(logState, tnState, cnState)
op.Check(logState, tnState, cnState, proxyState)
switch op.Status() {
case SUCCESS, EXPIRED:
c.RemoveOperator(op)
Expand All @@ -106,10 +108,10 @@ func (c *Controller) RemoveFinishedOperator(logState pb.LogState, tnState pb.TNS
}

func (c *Controller) Dispatch(ops []*Operator, logState pb.LogState,
tnState pb.TNState, cnState pb.CNState) (commands []pb.ScheduleCommand) {
tnState pb.TNState, cnState pb.CNState, proxyState pb.ProxyState) (commands []pb.ScheduleCommand) {
for _, op := range ops {
c.operators[op.shardID] = append(c.operators[op.shardID], op)
if step := op.Check(logState, tnState, cnState); step != nil {
if step := op.Check(logState, tnState, cnState, proxyState); step != nil {
commands = append(commands, generateScheduleCommand(step))
}
}
Expand Down Expand Up @@ -142,6 +144,8 @@ func generateScheduleCommand(step OpStep) pb.ScheduleCommand {
return deleteCNStore(st)
case JoinGossipCluster:
return joinGossipCluster(st)
case DeleteProxyStore:
return deleteProxyStore(st)
}
panic("invalid schedule command")
}
Expand Down Expand Up @@ -306,3 +310,13 @@ func joinGossipCluster(st JoinGossipCluster) pb.ScheduleCommand {
},
}
}

func deleteProxyStore(st DeleteProxyStore) pb.ScheduleCommand {
return pb.ScheduleCommand{
UUID: st.StoreID,
ServiceType: pb.ProxyService,
DeleteProxyStore: &pb.DeleteProxyStore{
StoreID: st.StoreID,
},
}
}
10 changes: 5 additions & 5 deletions pkg/hakeeper/operator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func TestDispatchAndRemoveOperator(t *testing.T) {
operator2 := &Operator{shardID: 1}
operator3 := &Operator{shardID: 2}

c.Dispatch([]*Operator{operator1}, pb.LogState{}, pb.TNState{}, pb.CNState{})
c.Dispatch([]*Operator{operator1}, pb.LogState{}, pb.TNState{}, pb.CNState{}, pb.ProxyState{})
assert.Equal(t, []*Operator{operator1}, c.operators[1])

c.Dispatch([]*Operator{operator2}, pb.LogState{}, pb.TNState{}, pb.CNState{})
c.Dispatch([]*Operator{operator2}, pb.LogState{}, pb.TNState{}, pb.CNState{}, pb.ProxyState{})
assert.Equal(t, []*Operator{operator1, operator2}, c.operators[1])

c.Dispatch([]*Operator{operator3}, pb.LogState{}, pb.TNState{}, pb.CNState{})
c.Dispatch([]*Operator{operator3}, pb.LogState{}, pb.TNState{}, pb.CNState{}, pb.ProxyState{})
assert.Equal(t, []*Operator{operator3}, c.operators[2])

c.RemoveOperator(operator1)
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestRemoveFinishedOperator(t *testing.T) {
}},
}

c.Dispatch([]*Operator{op1}, logState, pb.TNState{}, pb.CNState{})
c.Dispatch([]*Operator{op1}, logState, pb.TNState{}, pb.CNState{}, pb.ProxyState{})
assert.Equal(t, []*Operator{op1}, c.GetOperators(1))

logState = pb.LogState{
Expand All @@ -79,6 +79,6 @@ func TestRemoveFinishedOperator(t *testing.T) {
Epoch: 0,
}},
}
c.RemoveFinishedOperator(logState, pb.TNState{}, pb.CNState{})
c.RemoveFinishedOperator(logState, pb.TNState{}, pb.CNState{}, pb.ProxyState{})
assert.Equal(t, []*Operator(nil), c.GetOperators(1))
}
4 changes: 4 additions & 0 deletions pkg/hakeeper/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func JoinGossipClusterOp(brief, uuid string, existing []string) *Operator {
JoinGossipCluster{StoreID: uuid, Existing: existing},
)
}

func CreateDeleteProxyOp(brief, uuid string) *Operator {
return NewOperator(brief, 0, 0, DeleteProxyStore{StoreID: uuid})
}
11 changes: 9 additions & 2 deletions pkg/hakeeper/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,21 @@ func (o *Operator) CheckExpired() bool {
return o.status.CheckExpired(ExpireTime)
}

func (o *Operator) Check(logState pb.LogState, tnState pb.TNState, cnState pb.CNState) OpStep {
func (o *Operator) Check(
logState pb.LogState, tnState pb.TNState, cnState pb.CNState, proxyState pb.ProxyState,
) OpStep {
if o.IsEnd() {
return nil
}
// CheckExpired will call CheckSuccess first
defer func() { _ = o.CheckExpired() }()
for step := o.currentStep; int(step) < len(o.steps); step++ {
if o.steps[int(step)].IsFinish(logState, tnState, cnState) {
if o.steps[int(step)].IsFinish(ClusterState{
LogState: logState,
TNState: tnState,
CNState: cnState,
ProxyState: proxyState,
}) {
o.currentStep = step + 1
} else {
return o.steps[int(step)]
Expand Down
8 changes: 4 additions & 4 deletions pkg/hakeeper/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCheck(t *testing.T) {
Epoch: 0,
}},
}
currentStep := op.Check(logState, pb.TNState{}, pb.CNState{})
currentStep := op.Check(logState, pb.TNState{}, pb.CNState{}, pb.ProxyState{})

assert.Equal(t,
AddLogService{"a", Replica{"d", 1, 4, 1}},
Expand All @@ -69,7 +69,7 @@ func TestCheck(t *testing.T) {
Epoch: 0,
}},
}
currentStep = op.Check(logState, pb.TNState{}, pb.CNState{})
currentStep = op.Check(logState, pb.TNState{}, pb.CNState{}, pb.ProxyState{})

assert.Equal(t,
RemoveLogService{"a", Replica{"c", 1, 3, 1}},
Expand All @@ -83,10 +83,10 @@ func TestCheck(t *testing.T) {
Epoch: 0,
}},
}
currentStep = op.Check(logState, pb.TNState{}, pb.CNState{})
currentStep = op.Check(logState, pb.TNState{}, pb.CNState{}, pb.ProxyState{})

assert.Equal(t, nil, currentStep)
assert.Equal(t, SUCCESS, op.Status())

assert.Equal(t, nil, op.Check(pb.LogState{}, pb.TNState{}, pb.CNState{}))
assert.Equal(t, nil, op.Check(pb.LogState{}, pb.TNState{}, pb.CNState{}, pb.ProxyState{}))
}
Loading