Skip to content

Commit

Permalink
topom: make SlotsRebalance more stable
Browse files Browse the repository at this point in the history
  • Loading branch information
spinlock committed May 12, 2017
1 parent 77b4103 commit 0735218
Showing 1 changed file with 47 additions and 58 deletions.
105 changes: 47 additions & 58 deletions pkg/topom/topom_slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,11 @@ func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) {
var (
assigned = make(map[int]int)
pendings = make(map[int][]int)
moveout = make(map[int]int)
docking []int
)
var groupSize = func(gid int) int {
return assigned[gid] + len(pendings[gid])
return assigned[gid] + len(pendings[gid]) - moveout[gid]
}

// don't migrate slot if it's being migrated
Expand All @@ -592,11 +594,6 @@ func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) {
}
}

// reverse pending list for each group
for _, list := range pendings {
sort.Sort(sort.Reverse(sort.IntSlice(list)))
}

var tree = rbtree.NewWith(func(x, y interface{}) int {
var gid1 = x.(int)
var gid2 = y.(int)
Expand All @@ -612,8 +609,6 @@ func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) {
tree.Put(gid, nil)
}

var offline []int

// assign offline slots to the smallest group
for _, m := range ctx.slots {
if m.Action.State != models.ActionNothing {
Expand All @@ -622,79 +617,73 @@ func (s *Topom) SlotsRebalance(confirm bool) (map[int]int, error) {
if m.GroupId != 0 {
continue
}
gid := tree.Left().Key.(int)
tree.Remove(gid)

assigned[gid]++
tree.Put(gid, nil)

offline = append(offline, gid)
}
sort.Ints(offline)
dest := tree.Left().Key.(int)
tree.Remove(dest)

var plans = make(map[int]int)
docking = append(docking, m.Id)
moveout[dest]--

// create migration plans for offline slots
for _, m := range ctx.slots {
if m.Action.State != models.ActionNothing {
continue
}
if m.GroupId != 0 {
continue
}
if len(offline) != 0 {
plans[m.Id], offline = offline[0], offline[1:]
}
tree.Put(dest, nil)
}

var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds)

var newPlan = func(from, dest int) bool {
var fromSize = groupSize(from)
var destSize = groupSize(dest)
if fromSize <= lowerBound {
return false
}
if destSize >= upperBound {
return false
}
if d := fromSize - destSize; d <= 1 {
return false
}
var list = pendings[from]
if len(list) == 0 {
return false
}
plans[list[0]] = dest
pendings[from] = list[1:]
assigned[dest]++
return true
}

// rebalance between different server groups

for tree.Size() >= 2 {
from := tree.Right().Key.(int)
tree.Remove(from)

if len(pendings[from]) == 0 {
if len(pendings[from]) == moveout[from] {
continue
}

dest := tree.Left().Key.(int)
tree.Remove(dest)

var updated bool
for newPlan(from, dest) {
updated = true
var (
fromSize = groupSize(from)
destSize = groupSize(dest)
)
if fromSize <= lowerBound {
break
}
if destSize >= upperBound {
break
}
if !updated {
if d := fromSize - destSize; d <= 1 {
break
}
moveout[from]++
moveout[dest]--

tree.Put(from, nil)
tree.Put(dest, nil)
}

for gid, n := range moveout {
if n < 0 {
continue
}
if n > 0 {
sids := pendings[gid]
sort.Sort(sort.Reverse(sort.IntSlice(sids)))

docking = append(docking, sids[0:n]...)
pendings[gid] = sids[n:]
}
delete(moveout, gid)
}
sort.Ints(docking)

var plans = make(map[int]int)

for _, gid := range groupIds {
var in = -moveout[gid]
for i := 0; i < in && len(docking) != 0; i++ {
plans[docking[0]] = gid
docking = docking[1:]
}
}

if !confirm {
return plans, nil
}
Expand Down

0 comments on commit 0735218

Please sign in to comment.