Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor server construction #991

Merged
merged 9 commits into from
Oct 16, 2024
48 changes: 29 additions & 19 deletions cmd/nvidia-device-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ import (
"github.com/NVIDIA/k8s-device-plugin/internal/watch"
)

func main() {
var configFile string
type options struct {
flags []cli.Flag
configFile string
kubeletSocket string
}

func main() {
c := cli.NewApp()
o := &options{}
c.Name = "NVIDIA Device Plugin"
c.Usage = "NVIDIA device plugin for Kubernetes"
c.Version = info.GetVersionString()
c.Action = func(ctx *cli.Context) error {
return start(ctx, c.Flags)
return start(ctx, o)
}

c.Flags = []cli.Flag{
Expand Down Expand Up @@ -115,7 +120,7 @@ func main() {
&cli.StringFlag{
Name: "config-file",
Usage: "the path to a config file as an alternative to command line options or environment variables",
Destination: &configFile,
Destination: &o.configFile,
EnvVars: []string{"CONFIG_FILE"},
},
&cli.StringFlag{
Expand Down Expand Up @@ -160,6 +165,7 @@ func main() {
EnvVars: []string{"IMEX_REQUIRED"},
},
}
o.flags = c.Flags

err := c.Run(os.Args)
if err != nil {
Expand Down Expand Up @@ -200,6 +206,14 @@ func validateFlags(infolib nvinfo.Interface, config *spec.Config) error {
return fmt.Errorf("invalid --device-discovery-strategy option %v", *config.Flags.DeviceDiscoveryStrategy)
}

switch *config.Flags.MigStrategy {
case spec.MigStrategyNone:
case spec.MigStrategySingle:
case spec.MigStrategyMixed:
default:
return fmt.Errorf("unknown MIG strategy: %v", *config.Flags.MigStrategy)
}

if err := spec.AssertChannelIDsValid(config.Imex.ChannelIDs); err != nil {
return fmt.Errorf("invalid IMEX channel IDs: %w", err)
}
Expand All @@ -216,9 +230,10 @@ func loadConfig(c *cli.Context, flags []cli.Flag) (*spec.Config, error) {
return config, nil
}

func start(c *cli.Context, flags []cli.Flag) error {
kubeletSocket := c.String("kubelet-socket")
kubeletSocketDir := filepath.Dir(kubeletSocket)
func start(c *cli.Context, o *options) error {
klog.InfoS(fmt.Sprintf("Starting %s", c.App.Name), "version", c.App.Version)

kubeletSocketDir := filepath.Dir(o.kubeletSocket)
klog.Infof("Starting FS watcher for %v", kubeletSocketDir)
watcher, err := watch.Files(kubeletSocketDir)
if err != nil {
Expand All @@ -242,7 +257,7 @@ restart:
}

klog.Info("Starting Plugins.")
plugins, restartPlugins, err := startPlugins(c, flags)
plugins, restartPlugins, err := startPlugins(c, o)
if err != nil {
return fmt.Errorf("error starting plugins: %v", err)
}
Expand All @@ -265,8 +280,8 @@ restart:
// 'pluginapi.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
if kubeletSocket != "" && event.Name == kubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", kubeletSocket)
if o.kubeletSocket != "" && event.Name == o.kubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
klog.Infof("inotify: %s created, restarting.", o.kubeletSocket)
goto restart
}

Expand Down Expand Up @@ -296,10 +311,10 @@ exit:
return nil
}

func startPlugins(c *cli.Context, flags []cli.Flag) ([]plugin.Interface, bool, error) {
func startPlugins(c *cli.Context, o *options) ([]plugin.Interface, bool, error) {
// Load the configuration file
klog.Info("Loading configuration.")
config, err := loadConfig(c, flags)
config, err := loadConfig(c, o.flags)
if err != nil {
return nil, false, fmt.Errorf("unable to load config: %v", err)
}
Expand Down Expand Up @@ -338,12 +353,7 @@ func startPlugins(c *cli.Context, flags []cli.Flag) ([]plugin.Interface, bool, e

// Get the set of plugins.
klog.Info("Retrieving plugins.")
kubeletSocket := c.String("kubelet-socket")
pluginManager, err := NewPluginManager(infolib, nvmllib, devicelib, kubeletSocket, config)
if err != nil {
return nil, false, fmt.Errorf("error creating plugin manager: %v", err)
}
plugins, err := pluginManager.GetPlugins()
plugins, err := GetPlugins(infolib, nvmllib, devicelib, config)
klueska marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, false, fmt.Errorf("error getting plugins: %v", err)
}
Expand All @@ -359,7 +369,7 @@ func startPlugins(c *cli.Context, flags []cli.Flag) ([]plugin.Interface, bool, e
}

// Start the gRPC server for plugin p and connect it with the kubelet.
if err := p.Start(); err != nil {
if err := p.Start(o.kubeletSocket); err != nil {
klog.Errorf("Failed to start plugin: %v", err)
return plugins, true, nil
}
Expand Down
34 changes: 12 additions & 22 deletions cmd/nvidia-device-plugin/plugin-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,11 @@ import (
spec "github.com/NVIDIA/k8s-device-plugin/api/config/v1"
"github.com/NVIDIA/k8s-device-plugin/internal/cdi"
"github.com/NVIDIA/k8s-device-plugin/internal/imex"
"github.com/NVIDIA/k8s-device-plugin/internal/plugin/manager"
"github.com/NVIDIA/k8s-device-plugin/internal/plugin"
)

// NewPluginManager creates an NVML-based plugin manager
func NewPluginManager(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, kubeletSocket string, config *spec.Config) (manager.Interface, error) {
var err error
switch *config.Flags.MigStrategy {
case spec.MigStrategyNone:
case spec.MigStrategySingle:
case spec.MigStrategyMixed:
default:
return nil, fmt.Errorf("unknown strategy: %v", *config.Flags.MigStrategy)
}

// GetPlugins returns a set of plugins for the specified configuration.
func GetPlugins(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, config *spec.Config) ([]plugin.Interface, error) {
// TODO: We could consider passing this as an argument since it should already be used to construct nvmllib.
driverRoot := root(*config.Flags.Plugin.ContainerDriverRoot)

Expand Down Expand Up @@ -70,21 +61,20 @@ func NewPluginManager(infolib info.Interface, nvmllib nvml.Interface, devicelib
return nil, fmt.Errorf("unable to create cdi handler: %v", err)
}

m, err := manager.New(infolib, nvmllib, devicelib,
manager.WithCDIHandler(cdiHandler),
manager.WithConfig(config),
manager.WithFailOnInitError(*config.Flags.FailOnInitError),
manager.WithKubeletSocket(kubeletSocket),
manager.WithMigStrategy(*config.Flags.MigStrategy),
manager.WithImexChannels(imexChannels),
plugins, err := plugin.New(infolib, nvmllib, devicelib,
plugin.WithCDIHandler(cdiHandler),
plugin.WithConfig(config),
plugin.WithDeviceListStrategies(deviceListStrategies),
plugin.WithFailOnInitError(*config.Flags.FailOnInitError),
plugin.WithImexChannels(imexChannels),
)
if err != nil {
return nil, fmt.Errorf("unable to create plugin manager: %v", err)
return nil, fmt.Errorf("unable to create plugins: %w", err)
}

if err := m.CreateCDISpecFile(); err != nil {
if err := cdiHandler.CreateSpecFile(); err != nil {
return nil, fmt.Errorf("unable to create cdi spec file: %v", err)
}

return m, nil
return plugins, nil
}
2 changes: 1 addition & 1 deletion internal/plugin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import "github.com/NVIDIA/k8s-device-plugin/internal/rm"
// Interface defines the API for the plugin package
type Interface interface {
Devices() rm.Devices
Start() error
Start(string) error
Stop() error
}
68 changes: 44 additions & 24 deletions internal/plugin/manager/factory.go → internal/plugin/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
**/

package manager
package plugin

import (
"fmt"
Expand All @@ -27,86 +27,106 @@ import (
spec "github.com/NVIDIA/k8s-device-plugin/api/config/v1"
"github.com/NVIDIA/k8s-device-plugin/internal/cdi"
"github.com/NVIDIA/k8s-device-plugin/internal/imex"
"github.com/NVIDIA/k8s-device-plugin/internal/rm"
)

type manager struct {
type options struct {
infolib info.Interface
nvmllib nvml.Interface
devicelib device.Interface

migStrategy string
failOnInitError bool

cdiHandler cdi.Interface
config *spec.Config

imexChannels imex.Channels
deviceListStrategies spec.DeviceListStrategies

kubeletSocket string
imexChannels imex.Channels
}

// New creates a new plugin manager with the supplied options.
func New(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, opts ...Option) (Interface, error) {
m := &manager{
// New a new set of plugins with the supplied options.
func New(infolib info.Interface, nvmllib nvml.Interface, devicelib device.Interface, opts ...Option) ([]Interface, error) {
klueska marked this conversation as resolved.
Show resolved Hide resolved
o := &options{
infolib: infolib,
nvmllib: nvmllib,
devicelib: devicelib,
}
for _, opt := range opts {
opt(m)
opt(o)
}

if m.config == nil {
if o.config == nil {
klog.Warning("no config provided, returning a null manager")
return &null{}, nil
return nil, nil
}

if o.cdiHandler == nil {
o.cdiHandler = cdi.NewNullHandler()
}

if m.cdiHandler == nil {
m.cdiHandler = cdi.NewNullHandler()
resourceManagers, err := o.getResourceManagers()
if err != nil {
return nil, fmt.Errorf("failed to construct resource managers: %w", err)
}

strategy := m.resolveStrategy(*m.config.Flags.DeviceDiscoveryStrategy)
var plugins []Interface
for _, resourceManager := range resourceManagers {
plugin, err := o.devicePluginForResource(resourceManager)
if err != nil {
return nil, fmt.Errorf("failed to create plugin: %w", err)
}
plugins = append(plugins, plugin)
}
return plugins, nil
}

// getResourceManager constructs a set of resource managers.
// Each resource manager maps to a specific named extended resource and may
// include full GPUs or MIG devices.
func (o *options) getResourceManagers() ([]rm.ResourceManager, error) {
strategy := o.resolveStrategy(*o.config.Flags.DeviceDiscoveryStrategy)
switch strategy {
case "nvml":
ret := m.nvmllib.Init()
ret := o.nvmllib.Init()
if ret != nvml.SUCCESS {
klog.Errorf("Failed to initialize NVML: %v.", ret)
klog.Errorf("If this is a GPU node, did you set the docker default runtime to `nvidia`?")
klog.Errorf("You can check the prerequisites at: https://github.com/NVIDIA/k8s-device-plugin#prerequisites")
klog.Errorf("You can learn how to set the runtime at: https://github.com/NVIDIA/k8s-device-plugin#quick-start")
klog.Errorf("If this is not a GPU node, you should set up a toleration or nodeSelector to only deploy this plugin on GPU nodes")
if m.failOnInitError {
if o.failOnInitError {
return nil, fmt.Errorf("nvml init failed: %v", ret)
}
klog.Warningf("nvml init failed: %v", ret)
return &null{}, nil
return nil, nil
}
defer func() {
_ = m.nvmllib.Shutdown()
_ = o.nvmllib.Shutdown()
}()

return (*nvmlmanager)(m), nil
return rm.NewNVMLResourceManagers(o.infolib, o.nvmllib, o.devicelib, o.config)
case "tegra":
return (*tegramanager)(m), nil
return rm.NewTegraResourceManagers(o.config)
default:
klog.Errorf("Incompatible strategy detected %v", strategy)
klog.Error("If this is a GPU node, did you configure the NVIDIA Container Toolkit?")
klog.Error("You can check the prerequisites at: https://github.com/NVIDIA/k8s-device-plugin#prerequisites")
klog.Error("You can learn how to set the runtime at: https://github.com/NVIDIA/k8s-device-plugin#quick-start")
klog.Error("If this is not a GPU node, you should set up a toleration or nodeSelector to only deploy this plugin on GPU nodes")
if m.failOnInitError {
if o.failOnInitError {
return nil, fmt.Errorf("invalid device discovery strategy")
}
return &null{}, nil
return nil, nil
}
}

func (m *manager) resolveStrategy(strategy string) string {
func (o *options) resolveStrategy(strategy string) string {
if strategy != "" && strategy != "auto" {
return strategy
}

platform := m.infolib.ResolvePlatform()
platform := o.infolib.ResolvePlatform()
switch platform {
case info.PlatformNVML, info.PlatformWSL:
return "nvml"
Expand Down
25 changes: 0 additions & 25 deletions internal/plugin/manager/api.go

This file was deleted.

33 changes: 0 additions & 33 deletions internal/plugin/manager/null.go

This file was deleted.

Loading