diff --git a/config/global.go b/config/global.go index 3c1906073a..77d5c74e07 100644 --- a/config/global.go +++ b/config/global.go @@ -38,6 +38,7 @@ type GlobalConfig struct { DaemonThreadsNum int CacheGCPeriod time.Duration MirrorsConfig MirrorsConfig + PrefetchRoot string } func IsFusedevSharedModeEnabled() bool { @@ -64,6 +65,10 @@ func GetConfigRoot() string { return globalConfig.ConfigRoot } +func GetPrefetchRoot() string { + return globalConfig.PrefetchRoot +} + func GetMirrorsConfigDir() string { return globalConfig.MirrorsConfig.Dir } @@ -181,6 +186,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error { globalConfig.ConfigRoot = filepath.Join(c.Root, "config") globalConfig.SocketRoot = filepath.Join(c.Root, "socket") globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt") + globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch") globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index 7af1f65e74..fb2fda575f 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -16,6 +16,7 @@ import ( "github.com/containerd/nydus-snapshotter/config" "github.com/containerd/nydus-snapshotter/internal/constant" + "github.com/containerd/nydus-snapshotter/pkg/prefetch" ) // Build runtime nydusd daemon object, which might be persisted later @@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt { } } +func WithPrefetchDir(dir, imageID string) NewDaemonOpt { + return func(d *Daemon) error { + s := filepath.Join(dir, d.ID()) + prefetchDir, err := prefetch.GetPrefetchList(s, imageID) + if err != nil && !errors.Is(err, prefetch.ErrUds) { + return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s) + } + if prefetchDir != "" { + d.States.PrefetchDir = prefetchDir + } + return nil + } +} + func WithRef(ref int32) NewDaemonOpt { return func(d *Daemon) error { d.ref = ref diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 35dd4f4814..1b1a2f3b21 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -54,7 +54,8 @@ type ConfigState struct { SupervisorPath string ThreadNum int // Where the configuration file resides, all rafs instances share the same configuration template - ConfigDir string + ConfigDir string + PrefetchDir string } // TODO: Record queried nydusd state diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index 4a95c4a3c6..401bfccc0b 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s if err != nil { return err } - d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0) + d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID) // if daemon already exists for snapshotID, just return if err != nil && !errdefs.IsAlreadyExists(err) { return err @@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) { return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver) } - d, err := fs.createDaemon(fsManager, daemonMode, mp, 0) + d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "") if err != nil { return errors.Wrap(err, "initialize shared daemon") } @@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) { // createDaemon create new nydus daemon by snapshotID and imageID func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode, - mountpoint string, ref int32) (d *daemon.Daemon, err error) { + mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) { opts := []daemon.NewDaemonOpt{ daemon.WithRef(ref), daemon.WithSocketDir(config.GetSocketRoot()), @@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config opts = append(opts, daemon.WithMountpoint(mountpoint)) } + if imageID != "" { + opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID)) + } + d, err = daemon.NewDaemon(opts...) if err != nil { return nil, errors.Wrapf(err, "new daemon") diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index e1335c1d6b..76c1495ba8 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -23,7 +23,6 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/errdefs" "github.com/containerd/nydus-snapshotter/pkg/metrics/collector" metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool" - "github.com/containerd/nydus-snapshotter/pkg/prefetch" ) const endpointGetBackend string = "/api/v1/daemons/%s/backend" @@ -122,7 +121,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error { // Build commandline according to nydusd daemon configuration. func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) { var cmdOpts []command.Opt - var imageReference string + // var imageReference string nydusdThreadNum := d.NydusdThreadNum() @@ -148,7 +147,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID()) } - imageReference = rafs.ImageID + // imageReference = rafs.ImageID bootstrap, err := rafs.BootstrapFile() if err != nil { @@ -176,12 +175,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) command.WithID(d.ID())) } - if imageReference != "" { - prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference) - if prefetchfiles != "" { - cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles)) - prefetch.Pm.DeleteFromPrefetchMap(imageReference) - } + if d.States.PrefetchDir != "" { + cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir)) } cmdOpts = append(cmdOpts, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 27b6fad7d6..352bb434d8 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) { resource := []string{d.States.ConfigDir, d.States.LogDir} if !d.IsSharedDaemon() { socketDir := path.Dir(d.GetAPISock()) + if d.States.PrefetchDir != "" { + prefetchDir := path.Dir(d.States.PrefetchDir) + resource = append(resource, prefetchDir) + } resource = append(resource, socketDir) } diff --git a/pkg/prefetch/prefetch.go b/pkg/prefetch/prefetch.go index 6f2d01cd3d..55570af22e 100644 --- a/pkg/prefetch/prefetch.go +++ b/pkg/prefetch/prefetch.go @@ -7,54 +7,119 @@ package prefetch import ( + "context" "encoding/json" - "sync" + "fmt" + "io" + "net" + "net/http" + "os" + "path/filepath" + "strings" "github.com/containerd/containerd/log" + "github.com/pkg/errors" ) -type prefetchInfo struct { - prefetchMap map[string]string - prefetchMutex sync.Mutex +type prefetchlist struct { + FilePaths []string `json:"files"` } -var Pm prefetchInfo +const ( + endpointPrefetch = "/api/v1/imagename" + udsSocket = "/tmp/prefetch.sock" +) + +var ErrUds = errors.New("failed to connect unix domain socket") -func (p *prefetchInfo) SetPrefetchFiles(body []byte) error { - p.prefetchMutex.Lock() - defer p.prefetchMutex.Unlock() +func GetPrefetchList(prefetchDir, imageRepo string) (string, error) { + url := fmt.Sprintf("http://unix%s", endpointPrefetch) - var prefetchMsg []map[string]string - if err := json.Unmarshal(body, &prefetchMsg); err != nil { - return err + req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(imageRepo)) + if err != nil { + return "", err } - if p.prefetchMap == nil { - p.prefetchMap = make(map[string]string) + client := &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", udsSocket) + }, + }, } - for _, item := range prefetchMsg { - image := item["image"] - prefetchfiles := item["prefetch"] - p.prefetchMap[image] = prefetchfiles + resp, err := client.Do(req) + if err != nil { + log.L.Infof("failed to connect unix domain socket. Skipping prefetch for image: %s\n", imageRepo) + return "", ErrUds } + defer resp.Body.Close() - log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap) - return nil -} + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to send data, status code: %v", resp.StatusCode) + } -func (p *prefetchInfo) GetPrefetchInfo(image string) string { - p.prefetchMutex.Lock() - defer p.prefetchMutex.Unlock() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } - if prefetchfiles, ok := p.prefetchMap[image]; ok { - return prefetchfiles + if strings.Contains(string(body), "CacheItem not found") { + log.L.Infof("Cache item not found for image: %s\n", imageRepo) + return "", nil } - return "" + + prefetchfilePath, err := storePrefetchList(prefetchDir, body) + if err != nil { + return "", err + } + return prefetchfilePath, nil } -func (p *prefetchInfo) DeleteFromPrefetchMap(image string) { - p.prefetchMutex.Lock() - defer p.prefetchMutex.Unlock() +func storePrefetchList(prefetchDir string, list []byte) (string, error) { + if err := os.MkdirAll(prefetchDir, 0755); err != nil { + return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir) + } + + filePath := filepath.Join(prefetchDir, "prefetchList") + jsonfilePath := filepath.Join(prefetchDir, "prefetchList.json") + + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + if err != nil { + fmt.Println("Error opening file:", err) + return "", errors.Wrap(err, "error opening prefetch file") + } + defer file.Close() + + var prefetchSlice []string + err = json.Unmarshal(list, &prefetchSlice) + if err != nil { + return "", errors.Wrap(err, "failed to parse prefetch list") + } + + for _, path := range prefetchSlice { + content := path + "\n" + _, err := file.WriteString(content) + if err != nil { + return "", errors.Wrap(err, "error writing to prefetch file") + } + } + + prefetchStruct := prefetchlist{FilePaths: prefetchSlice} + jsonByte, err := json.Marshal(prefetchStruct) + if err != nil { + return "", errors.Wrap(err, "failed to marshal to JSON") + } + + jsonfile, err := os.Create(jsonfilePath) + if err != nil { + return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath) + } + defer jsonfile.Close() + + _, err = jsonfile.Write(jsonByte) + if err != nil { + return "", errors.Wrap(err, "error writing JSON to file") + } - delete(p.prefetchMap, image) + return filePath, nil } diff --git a/pkg/system/system.go b/pkg/system/system.go index d07034314e..ee5582f43c 100644 --- a/pkg/system/system.go +++ b/pkg/system/system.go @@ -9,7 +9,6 @@ package system import ( "encoding/json" "fmt" - "io" "net" "net/http" "os" @@ -30,7 +29,6 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/filesystem" "github.com/containerd/nydus-snapshotter/pkg/manager" metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool" - "github.com/containerd/nydus-snapshotter/pkg/prefetch" ) const ( @@ -41,7 +39,6 @@ const ( // it's very helpful to check daemon's record in database. endpointDaemonRecords string = "/api/v1/daemons/records" endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade" - endpointPrefetch string = "/api/v1/prefetch" // Provide backend information endpointGetBackend string = "/api/v1/daemons/{id}/backend" ) @@ -172,7 +169,6 @@ func (sc *Controller) registerRouter() { sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet) sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut) sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet) - sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut) sc.router.HandleFunc(endpointGetBackend, sc.getBackend()).Methods(http.MethodGet) } @@ -216,20 +212,6 @@ func (sc *Controller) getBackend() func(w http.ResponseWriter, r *http.Request) } } -func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - log.L.Errorf("Failed to read prefetch list: %v", err) - return - } - if err = prefetch.Pm.SetPrefetchFiles(body); err != nil { - log.L.Errorf("Failed to parse request body: %v", err) - return - } - } -} - func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { info := make([]daemonInfo, 0, 10)