Skip to content

Commit

Permalink
manager/allocator: lift portAllocator out of CNM
Browse files Browse the repository at this point in the history
The port allocation logic does not depend on the network allocator
implementation in any meaningful way. It has no knowledge of the CNM
network allocator's state, and it does not need to change if the
network allocator changes. Allocating node ports is fundamentally a
seaparate concern from allocating network resources for services and
tasks. Therefore the low-level network allocator should not be
responsible for allocating both. Lift the port allocator into the
Allocator's network context as a sibling of the low-level network
allocator.

Signed-off-by: Cory Snider <[email protected]>
  • Loading branch information
corhere committed Feb 21, 2024
1 parent 196c38f commit d0a1c5e
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 41 deletions.
24 changes: 0 additions & 24 deletions manager/allocator/cnmallocator/networkallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ type cnmNetworkAllocator struct {
// The driver registry for all internal and external network drivers.
networkRegistry drvregistry.Networks

// The port allocator instance for allocating node ports
portAllocator *portAllocator

// Local network state used by cnmNetworkAllocator to do network management.
networks map[string]*network

Expand Down Expand Up @@ -108,8 +105,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat
tasks: make(map[string]struct{}),
nodes: make(map[string]map[string]struct{}),
pg: pg,

portAllocator: newPortAllocator(),
}

for ntype, i := range initializers {
Expand Down Expand Up @@ -207,9 +202,6 @@ func (na *cnmNetworkAllocator) Deallocate(n *api.Network) error {
// AllocateService allocates all the network resources such as virtual
// IP and ports needed by the service.
func (na *cnmNetworkAllocator) AllocateService(s *api.Service) (err error) {
if err = na.portAllocator.serviceAllocatePorts(s); err != nil {
return err
}
defer func() {
if err != nil {
na.DeallocateService(s)
Expand Down Expand Up @@ -312,7 +304,6 @@ func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error {
}
s.Endpoint.VirtualIPs = nil

na.portAllocator.serviceDeallocatePorts(s)
delete(na.services, s.ID)

return nil
Expand Down Expand Up @@ -369,19 +360,8 @@ func (na *cnmNetworkAllocator) IsTaskAllocated(t *api.Task) bool {
return true
}

// HostPublishPortsNeedUpdate returns true if the passed service needs
// allocations for its published ports in host (non ingress) mode
func (na *cnmNetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool {
return na.portAllocator.hostPublishPortsNeedUpdate(s)
}

// IsServiceAllocated returns false if the passed service needs to have network resources allocated/updated.
func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool {
var options networkallocator.ServiceAllocationOpts
for _, flag := range flags {
flag(&options)
}

specNetworks := serviceNetworks(s)

// If endpoint mode is VIP and allocator does not have the
Expand Down Expand Up @@ -443,10 +423,6 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(
}
}

if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit)
}
return true
}

Expand Down
9 changes: 5 additions & 4 deletions manager/allocator/cnmallocator/networkallocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,11 @@ func TestAllocateService(t *testing.T) {
err = na.AllocateService(s)
assert.NoError(t, err)
assert.Equal(t, 2, len(s.Endpoint.Ports))
assert.True(t, s.Endpoint.Ports[0].PublishedPort >= dynamicPortStart &&
s.Endpoint.Ports[0].PublishedPort <= dynamicPortEnd)
assert.True(t, s.Endpoint.Ports[1].PublishedPort >= dynamicPortStart &&
s.Endpoint.Ports[1].PublishedPort <= dynamicPortEnd)
assert.True(t, s.Endpoint.Ports[0].PublishedPort >= 1 &&
s.Endpoint.Ports[0].PublishedPort <= 65535)
assert.True(t, s.Endpoint.Ports[1].PublishedPort >= 1 &&
s.Endpoint.Ports[1].PublishedPort <= 65535)
assert.NotEqual(t, s.Endpoint.Ports[0].PublishedPort, s.Endpoint.Ports[1].PublishedPort)

assert.Equal(t, 1, len(s.Endpoint.VirtualIPs))

Expand Down
43 changes: 36 additions & 7 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type networkContext struct {
// the actual network allocation.
nwkAllocator networkallocator.NetworkAllocator

// The port allocator instance for allocating node ports
portAllocator *portAllocator

// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
Expand Down Expand Up @@ -95,6 +98,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {

nc := &networkContext{
nwkAllocator: na,
portAllocator: newPortAllocator(),
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
Expand Down Expand Up @@ -233,7 +237,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if nc.nwkAllocator.IsServiceAllocated(s) {
if nc.isServiceAllocated(s) {
break
}

Expand Down Expand Up @@ -261,8 +265,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if nc.nwkAllocator.IsServiceAllocated(s) {
if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) {
if nc.isServiceAllocated(s) {
if !nc.portAllocator.hostPublishPortsNeedUpdate(s) {
break
}
updatePortsInHostPublishMode(s)
Expand All @@ -287,6 +291,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
if err := nc.nwkAllocator.DeallocateService(s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID)
} else {
nc.portAllocator.serviceDeallocatePorts(s)
nc.somethingWasDeallocated = true
}

Expand Down Expand Up @@ -681,7 +686,7 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly

var allocatedServices []*api.Service
for _, s := range services {
if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) {
if nc.isServiceAllocated(s, networkallocator.OnInit) {
continue
}
if existingAddressesOnly &&
Expand Down Expand Up @@ -713,6 +718,23 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly
return nil
}

// isServiceAllocated returns false if the passed service needs to have network resources allocated/updated.
func (nc *networkContext) isServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool {
if !nc.nwkAllocator.IsServiceAllocated(s, flags...) {
return false
}

var options networkallocator.ServiceAllocationOpts
for _, flag := range flags {
flag(&options)
}
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return nc.portAllocator.isPortsAllocatedOnInit(s, options.OnInit)
}
return true
}

// allocateTasks allocates tasks in the store so far before we started watching.
func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error {
var (
Expand Down Expand Up @@ -815,7 +837,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo
// network configured or service endpoints have been
// allocated.
return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
(s == nil || nc.nwkAllocator.IsServiceAllocated(s))
(s == nil || nc.isServiceAllocated(s))
}

func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
Expand Down Expand Up @@ -1203,10 +1225,15 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin
if err := nc.nwkAllocator.DeallocateService(s); err != nil {
return err
}
nc.portAllocator.serviceDeallocatePorts(s)
nc.somethingWasDeallocated = true
}

if err := nc.portAllocator.serviceAllocatePorts(s); err != nil {
return err
}
if err := nc.nwkAllocator.AllocateService(s); err != nil {
nc.portAllocator.serviceDeallocatePorts(s)
nc.unallocatedServices[s.ID] = s
return err
}
Expand Down Expand Up @@ -1243,6 +1270,8 @@ func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Bat
}); err != nil {
if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil {
log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID)
} else {
a.netCtx.portAllocator.serviceDeallocatePorts(s)
}

return err
Expand Down Expand Up @@ -1298,7 +1327,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) {
return
}

if !nc.nwkAllocator.IsServiceAllocated(s) {
if !nc.isServiceAllocated(s) {
err = fmt.Errorf("service %s to which task %s belongs has pending allocations", s.ID, t.ID)
return
}
Expand Down Expand Up @@ -1423,7 +1452,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
nc := a.netCtx
var allocatedServices []*api.Service
for _, s := range nc.unallocatedServices {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if !nc.isServiceAllocated(s) {
if err := a.allocateService(ctx, s, false); err != nil {
log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID)
continue
Expand Down
4 changes: 0 additions & 4 deletions manager/allocator/networkallocator/networkallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ type NetworkAllocator interface {
// virtual IP and ports associated with the service.
DeallocateService(s *api.Service) error

// HostPublishPortsNeedUpdate returns true if the passed service needs
// allocations for its published ports in host (non ingress) mode
HostPublishPortsNeedUpdate(s *api.Service) bool

//
// Task Allocation
//
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cnmallocator
package allocator

import (
"github.com/moby/swarmkit/v2/api"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cnmallocator
package allocator

import (
"testing"
Expand Down

0 comments on commit d0a1c5e

Please sign in to comment.