Skip to content

Commit

Permalink
unawareness prefetch implementation on snapshotter side
Browse files Browse the repository at this point in the history
1. send post request to optimizer
2. store prefetchlist
3. add prefetchlist in nydusd
  • Loading branch information
billie60 committed Jan 22, 2024
1 parent 2331e91 commit b96cf48
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 61 deletions.
6 changes: 6 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type GlobalConfig struct {
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
PrefetchRoot string
}

func IsFusedevSharedModeEnabled() bool {
Expand All @@ -64,6 +65,10 @@ func GetConfigRoot() string {
return globalConfig.ConfigRoot
}

func GetPrefetchRoot() string {
return globalConfig.PrefetchRoot
}

func GetMirrorsConfigDir() string {
return globalConfig.MirrorsConfig.Dir
}
Expand Down Expand Up @@ -181,6 +186,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")

globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig

Expand Down
15 changes: 15 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/internal/constant"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

// Build runtime nydusd daemon object, which might be persisted later
Expand All @@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
}
}

func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
return func(d *Daemon) error {
s := filepath.Join(dir, d.ID())
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
if err != nil && !errors.Is(err, prefetch.ErrUds) {
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
}
if prefetchDir != "" {
d.States.PrefetchDir = prefetchDir
}
return nil
}
}

func WithRef(ref int32) NewDaemonOpt {
return func(d *Daemon) error {
d.ref = ref
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type ConfigState struct {
SupervisorPath string
ThreadNum int
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
ConfigDir string
PrefetchDir string
}

// TODO: Record queried nydusd state
Expand Down
10 changes: 7 additions & 3 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
if err != nil {
return err
}
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
// if daemon already exists for snapshotID, just return
if err != nil && !errdefs.IsAlreadyExists(err) {
return err
Expand Down Expand Up @@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
}

d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
if err != nil {
return errors.Wrap(err, "initialize shared daemon")
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {

// createDaemon create new nydus daemon by snapshotID and imageID
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
opts := []daemon.NewDaemonOpt{
daemon.WithRef(ref),
daemon.WithSocketDir(config.GetSocketRoot()),
Expand All @@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
opts = append(opts, daemon.WithMountpoint(mountpoint))
}

if imageID != "" {
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
}

d, err = daemon.NewDaemon(opts...)
if err != nil {
return nil, errors.Wrapf(err, "new daemon")
Expand Down
13 changes: 4 additions & 9 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

const endpointGetBackend string = "/api/v1/daemons/%s/backend"
Expand Down Expand Up @@ -122,7 +121,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
// Build commandline according to nydusd daemon configuration.
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
var cmdOpts []command.Opt
var imageReference string
// var imageReference string

nydusdThreadNum := d.NydusdThreadNum()

Expand All @@ -148,7 +147,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}

imageReference = rafs.ImageID
// imageReference = rafs.ImageID

bootstrap, err := rafs.BootstrapFile()
if err != nil {
Expand Down Expand Up @@ -176,12 +175,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
command.WithID(d.ID()))
}

if imageReference != "" {
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
if prefetchfiles != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
}
if d.States.PrefetchDir != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
}

cmdOpts = append(cmdOpts,
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
resource := []string{d.States.ConfigDir, d.States.LogDir}
if !d.IsSharedDaemon() {
socketDir := path.Dir(d.GetAPISock())
if d.States.PrefetchDir != "" {
prefetchDir := path.Dir(d.States.PrefetchDir)
resource = append(resource, prefetchDir)
}
resource = append(resource, socketDir)
}

Expand Down
125 changes: 95 additions & 30 deletions pkg/prefetch/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,119 @@
package prefetch

import (
"context"
"encoding/json"
"sync"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/containerd/containerd/log"
"github.com/pkg/errors"
)

type prefetchInfo struct {
prefetchMap map[string]string
prefetchMutex sync.Mutex
type prefetchlist struct {
FilePaths []string `json:"files"`
}

var Pm prefetchInfo
const (
endpointPrefetch = "/api/v1/imagename"
udsSocket = "/tmp/prefetch.sock"
)

var ErrUds = errors.New("failed to connect unix domain socket")

func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
url := fmt.Sprintf("http://unix%s", endpointPrefetch)

var prefetchMsg []map[string]string
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
return err
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(imageRepo))
if err != nil {
return "", err
}

if p.prefetchMap == nil {
p.prefetchMap = make(map[string]string)
client := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", udsSocket)
},
},
}
for _, item := range prefetchMsg {
image := item["image"]
prefetchfiles := item["prefetch"]
p.prefetchMap[image] = prefetchfiles
resp, err := client.Do(req)
if err != nil {
log.L.Infof("failed to connect unix domain socket. Skipping prefetch for image: %s\n", imageRepo)
return "", ErrUds
}
defer resp.Body.Close()

log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
return nil
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to send data, status code: %v", resp.StatusCode)
}

func (p *prefetchInfo) GetPrefetchInfo(image string) string {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

if prefetchfiles, ok := p.prefetchMap[image]; ok {
return prefetchfiles
if strings.Contains(string(body), "CacheItem not found") {
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
return "", nil
}
return ""

prefetchfilePath, err := storePrefetchList(prefetchDir, body)
if err != nil {
return "", err
}
return prefetchfilePath, nil
}

func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
}

filePath := filepath.Join(prefetchDir, "prefetchList")
jsonfilePath := filepath.Join(prefetchDir, "prefetchList.json")

file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
fmt.Println("Error opening file:", err)
return "", errors.Wrap(err, "error opening prefetch file")
}
defer file.Close()

var prefetchSlice []string
err = json.Unmarshal(list, &prefetchSlice)
if err != nil {
return "", errors.Wrap(err, "failed to parse prefetch list")
}

for _, path := range prefetchSlice {
content := path + "\n"
_, err := file.WriteString(content)
if err != nil {
return "", errors.Wrap(err, "error writing to prefetch file")
}
}

prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
jsonByte, err := json.Marshal(prefetchStruct)
if err != nil {
return "", errors.Wrap(err, "failed to marshal to JSON")
}

jsonfile, err := os.Create(jsonfilePath)
if err != nil {
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
}
defer jsonfile.Close()

_, err = jsonfile.Write(jsonByte)
if err != nil {
return "", errors.Wrap(err, "error writing JSON to file")
}

delete(p.prefetchMap, image)
return filePath, nil
}
18 changes: 0 additions & 18 deletions pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package system
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/filesystem"
"github.com/containerd/nydus-snapshotter/pkg/manager"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

const (
Expand All @@ -41,7 +39,6 @@ const (
// it's very helpful to check daemon's record in database.
endpointDaemonRecords string = "/api/v1/daemons/records"
endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade"
endpointPrefetch string = "/api/v1/prefetch"
// Provide backend information
endpointGetBackend string = "/api/v1/daemons/{id}/backend"
)
Expand Down Expand Up @@ -172,7 +169,6 @@ func (sc *Controller) registerRouter() {
sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut)
sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut)
sc.router.HandleFunc(endpointGetBackend, sc.getBackend()).Methods(http.MethodGet)
}

Expand Down Expand Up @@ -216,20 +212,6 @@ func (sc *Controller) getBackend() func(w http.ResponseWriter, r *http.Request)
}
}

func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.L.Errorf("Failed to read prefetch list: %v", err)
return
}
if err = prefetch.Pm.SetPrefetchFiles(body); err != nil {
log.L.Errorf("Failed to parse request body: %v", err)
return
}
}
}

func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
info := make([]daemonInfo, 0, 10)
Expand Down

0 comments on commit b96cf48

Please sign in to comment.