Skip to content

Commit

Permalink
unawareness prefetch implementation on snapshotter side
Browse files Browse the repository at this point in the history
1. send post request to optimizer
2. store prefetchlist
3. add prefetchlist in nydusd
  • Loading branch information
billie60 committed Mar 4, 2024
1 parent ee4d5f0 commit 34bd531
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 68 deletions.
6 changes: 6 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type GlobalConfig struct {
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
PrefetchRoot string
}

func IsFusedevSharedModeEnabled() bool {
Expand All @@ -64,6 +65,10 @@ func GetConfigRoot() string {
return globalConfig.ConfigRoot
}

func GetPrefetchRoot() string {
return globalConfig.PrefetchRoot
}

func GetMirrorsConfigDir() string {
return globalConfig.MirrorsConfig.Dir
}
Expand Down Expand Up @@ -181,6 +186,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")

globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig

Expand Down
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ require (
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

require (
github.com/labstack/gommon v0.4.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
)

require (
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
Expand Down Expand Up @@ -117,6 +111,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -138,6 +133,8 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vbatts/tar-split v0.11.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1 // indirect
Expand Down
15 changes: 15 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/internal/constant"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

// Build runtime nydusd daemon object, which might be persisted later
Expand All @@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
}
}

func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
return func(d *Daemon) error {
s := filepath.Join(dir, d.ID())
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
if err != nil && !errors.Is(err, prefetch.ErrUds) {
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
}
if prefetchDir != "" {
d.States.PrefetchDir = prefetchDir
}
return nil
}
}

func WithRef(ref int32) NewDaemonOpt {
return func(d *Daemon) error {
d.ref = ref
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type ConfigState struct {
SupervisorPath string
ThreadNum int
// Where the configuration file resides, all rafs instances share the same configuration template
ConfigDir string
ConfigDir string
PrefetchDir string
}

// TODO: Record queried nydusd state
Expand Down
10 changes: 7 additions & 3 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
if err != nil {
return err
}
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
// if daemon already exists for snapshotID, just return
if err != nil && !errdefs.IsAlreadyExists(err) {
return err
Expand Down Expand Up @@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
}

d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
if err != nil {
return errors.Wrap(err, "initialize shared daemon")
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {

// createDaemon create new nydus daemon by snapshotID and imageID
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
opts := []daemon.NewDaemonOpt{
daemon.WithRef(ref),
daemon.WithSocketDir(config.GetSocketRoot()),
Expand All @@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
opts = append(opts, daemon.WithMountpoint(mountpoint))
}

if imageID != "" {
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
}

d, err = daemon.NewDaemon(opts...)
if err != nil {
return nil, errors.Wrapf(err, "new daemon")
Expand Down
12 changes: 2 additions & 10 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

const endpointGetBackend string = "/api/v1/daemons/%s/backend"
Expand Down Expand Up @@ -122,7 +121,6 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
// Build commandline according to nydusd daemon configuration.
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
var cmdOpts []command.Opt
var imageReference string

nydusdThreadNum := d.NydusdThreadNum()

Expand All @@ -148,8 +146,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}

imageReference = rafs.ImageID

bootstrap, err := rafs.BootstrapFile()
if err != nil {
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
Expand All @@ -176,12 +172,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
command.WithID(d.ID()))
}

if imageReference != "" {
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
if prefetchfiles != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
}
if d.States.PrefetchDir != "" {
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
}

cmdOpts = append(cmdOpts,
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
resource := []string{d.States.ConfigDir, d.States.LogDir}
if !d.IsSharedDaemon() {
socketDir := path.Dir(d.GetAPISock())
if d.States.PrefetchDir != "" {
prefetchDir := path.Dir(d.States.PrefetchDir)
resource = append(resource, prefetchDir)
}
resource = append(resource, socketDir)
}

Expand Down
125 changes: 95 additions & 30 deletions pkg/prefetch/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,119 @@
package prefetch

import (
"context"
"encoding/json"
"sync"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/containerd/containerd/log"
"github.com/pkg/errors"
)

type prefetchInfo struct {
prefetchMap map[string]string
prefetchMutex sync.Mutex
type prefetchlist struct {
FilePaths []string `json:"files"`
}

var Pm prefetchInfo
const (
endpointPrefetch = "/api/v1/imagename"
udsSocket = "/run/optimizer/prefetch.sock"
)

var ErrUds = errors.New("failed to connect unix domain socket")

func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
url := fmt.Sprintf("http://unix%s", endpointPrefetch)

var prefetchMsg []map[string]string
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
return err
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(imageRepo))
if err != nil {
return "", err
}

if p.prefetchMap == nil {
p.prefetchMap = make(map[string]string)
client := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", udsSocket)
},
},
}
for _, item := range prefetchMsg {
image := item["image"]
prefetchfiles := item["prefetch"]
p.prefetchMap[image] = prefetchfiles
resp, err := client.Do(req)
if err != nil {
log.L.Infof("failed to connect unix domain socket. Skipping prefetch for image: %s\n", imageRepo)
return "", ErrUds
}
defer resp.Body.Close()

log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
return nil
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to send data, status code: %v", resp.StatusCode)
}

func (p *prefetchInfo) GetPrefetchInfo(image string) string {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

if prefetchfiles, ok := p.prefetchMap[image]; ok {
return prefetchfiles
if strings.Contains(string(body), "CacheItem not found") {
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
return "", nil
}
return ""

prefetchfilePath, err := storePrefetchList(prefetchDir, body)
if err != nil {
return "", err
}
return prefetchfilePath, nil
}

func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
p.prefetchMutex.Lock()
defer p.prefetchMutex.Unlock()
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
}

filePath := filepath.Join(prefetchDir, "prefetchList")
jsonfilePath := filepath.Join(prefetchDir, "prefetchList.json")

file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
fmt.Println("Error opening file:", err)
return "", errors.Wrap(err, "error opening prefetch file")
}
defer file.Close()

var prefetchSlice []string
err = json.Unmarshal(list, &prefetchSlice)
if err != nil {
return "", errors.Wrap(err, "failed to parse prefetch list")
}

for _, path := range prefetchSlice {
content := path + "\n"
_, err := file.WriteString(content)
if err != nil {
return "", errors.Wrap(err, "error writing to prefetch file")
}
}

prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
jsonByte, err := json.Marshal(prefetchStruct)
if err != nil {
return "", errors.Wrap(err, "failed to marshal to JSON")
}

jsonfile, err := os.Create(jsonfilePath)
if err != nil {
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
}
defer jsonfile.Close()

_, err = jsonfile.Write(jsonByte)
if err != nil {
return "", errors.Wrap(err, "error writing JSON to file")
}

delete(p.prefetchMap, image)
return filePath, nil
}
18 changes: 0 additions & 18 deletions pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package system
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/filesystem"
"github.com/containerd/nydus-snapshotter/pkg/manager"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
)

const (
Expand All @@ -41,7 +39,6 @@ const (
// it's very helpful to check daemon's record in database.
endpointDaemonRecords string = "/api/v1/daemons/records"
endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade"
endpointPrefetch string = "/api/v1/prefetch"
// Provide backend information
endpointGetBackend string = "/api/v1/daemons/{id}/backend"
)
Expand Down Expand Up @@ -172,7 +169,6 @@ func (sc *Controller) registerRouter() {
sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut)
sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet)
sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut)
sc.router.HandleFunc(endpointGetBackend, sc.getBackend()).Methods(http.MethodGet)
}

Expand Down Expand Up @@ -216,20 +212,6 @@ func (sc *Controller) getBackend() func(w http.ResponseWriter, r *http.Request)
}
}

func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) {
return func(_ http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.L.Errorf("Failed to read prefetch list: %v", err)
return
}
if err = prefetch.Pm.SetPrefetchFiles(body); err != nil {
log.L.Errorf("Failed to parse request body: %v", err)
return
}
}
}

func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, _ *http.Request) {
info := make([]daemonInfo, 0, 10)
Expand Down

0 comments on commit 34bd531

Please sign in to comment.