From cb17c3c8a6180729b5c77307cb98930aaa445b5c Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Mon, 10 Jul 2023 12:16:20 -0400 Subject: [PATCH] watch: move sync logic into separate package Just moving some code around in preparation for an alternative sync implementation that can do bulk transfers by using `tar`. Signed-off-by: Milas Bowman --- internal/sync/docker_cp.go | 107 +++++++++++++++++++++++++++++++++++++ internal/sync/shared.go | 44 +++++++++++++++ pkg/compose/watch.go | 100 +++++++++++----------------------- pkg/compose/watch_test.go | 12 +++-- 4 files changed, 188 insertions(+), 75 deletions(-) create mode 100644 internal/sync/docker_cp.go create mode 100644 internal/sync/shared.go diff --git a/internal/sync/docker_cp.go b/internal/sync/docker_cp.go new file mode 100644 index 0000000000..8e91c94fa1 --- /dev/null +++ b/internal/sync/docker_cp.go @@ -0,0 +1,107 @@ +/* + Copyright 2023 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "os" + + "github.com/compose-spec/compose-go/types" + "github.com/docker/compose/v2/pkg/api" + "github.com/sirupsen/logrus" +) + +type ComposeClient interface { + Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error) + + Copy(ctx context.Context, projectName string, options api.CopyOptions) error +} + +type DockerCopy struct { + client ComposeClient + + projectName string + + infoWriter io.Writer +} + +var _ Syncer = &DockerCopy{} + +func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy { + return &DockerCopy{ + projectName: projectName, + client: client, + infoWriter: infoWriter, + } +} + +func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error { + var errs []error + for i := range paths { + if err := d.sync(ctx, service, paths[i]); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error { + scale := 1 + if service.Deploy != nil && service.Deploy.Replicas != nil { + scale = int(*service.Deploy.Replicas) + } + + if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil { + if fi.IsDir() { + for i := 1; i <= scale; i++ { + _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ + Service: pathMapping.Service, + Command: []string{"mkdir", "-p", pathMapping.ContainerPath}, + Index: i, + }) + if err != nil { + logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err) + } + } + fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath) + } else { + err := d.client.Copy(ctx, d.projectName, api.CopyOptions{ + Source: pathMapping.HostPath, + Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath), + }) + if err != nil { + return err + } + fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath) + } + } else if errors.Is(statErr, fs.ErrNotExist) { + for i := 1; i <= scale; i++ { + _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ + Service: pathMapping.Service, + Command: []string{"rm", "-rf", pathMapping.ContainerPath}, + Index: i, + }) + if err != nil { + logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err) + } + } + fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath) + } + return nil +} diff --git a/internal/sync/shared.go b/internal/sync/shared.go new file mode 100644 index 0000000000..0fb15d49e4 --- /dev/null +++ b/internal/sync/shared.go @@ -0,0 +1,44 @@ +/* + Copyright 2023 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + + "github.com/compose-spec/compose-go/types" +) + +// PathMapping contains the Compose service and modified host system path. +type PathMapping struct { + // Service that the file event is for. + Service string + // HostPath that was created/modified/deleted outside the container. + // + // This is the path as seen from the user's perspective, e.g. + // - C:\Users\moby\Documents\hello-world\main.go (file on Windows) + // - /Users/moby/Documents/hello-world (directory on macOS) + HostPath string + // ContainerPath for the target file inside the container (only populated + // for sync events, not rebuild). + // + // This is the path as used in Docker CLI commands, e.g. + // - /workdir/main.go + // - /workdir/subdir + ContainerPath string +} + +type Syncer interface { + Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error +} diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 5de5bc12ab..77a5e13f99 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -1,6 +1,6 @@ /* - Copyright 2020 Docker Compose CLI authors + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -17,13 +17,13 @@ package compose import ( "context" "fmt" - "io/fs" - "os" "path" "path/filepath" "strings" "time" + "github.com/docker/compose/v2/internal/sync" + "github.com/compose-spec/compose-go/types" "github.com/jonboulle/clockwork" "github.com/mitchellh/mapstructure" @@ -54,11 +54,8 @@ type Trigger struct { const quietPeriod = 2 * time.Second -// fileMapping contains the Compose service and modified host system path. -// -// For file sync, the container path is also included. -// For rebuild, there is no container path, so it is always empty. -type fileMapping struct { +// fileEvent contains the Compose service and modified host system path. +type fileEvent struct { // Service that the file event is for. Service string // HostPath that was created/modified/deleted outside the container. @@ -67,17 +64,11 @@ type fileMapping struct { // - C:\Users\moby\Documents\hello-world\main.go // - /Users/moby/Documents/hello-world/main.go HostPath string - // ContainerPath for the target file inside the container (only populated - // for sync events, not rebuild). - // - // This is the path as used in Docker CLI commands, e.g. - // - /workdir/main.go - ContainerPath string } func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo - needRebuild := make(chan fileMapping) - needSync := make(chan fileMapping) + needRebuild := make(chan fileEvent) + needSync := make(chan sync.PathMapping) _, err := s.prepareProjectForBuild(project, nil) if err != nil { @@ -175,7 +166,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv return eg.Wait() } -func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan fileMapping, needRebuild chan fileMapping) error { +func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error { ignores := make([]watch.PathMatcher, len(triggers)) for i, trigger := range triggers { ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore) @@ -209,11 +200,6 @@ WATCH: fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath) - f := fileMapping{ - HostPath: hostPath, - Service: name, - } - switch trigger.Action { case WatchActionSync: logrus.Debugf("modified file %s triggered sync", hostPath) @@ -221,12 +207,18 @@ WATCH: if err != nil { return err } - // always use Unix-style paths for inside the container - f.ContainerPath = path.Join(trigger.Target, rel) - needSync <- f + needSync <- sync.PathMapping{ + Service: name, + HostPath: hostPath, + // always use Unix-style paths for inside the container + ContainerPath: path.Join(trigger.Target, rel), + } case WatchActionRebuild: logrus.Debugf("modified file %s requires image to be rebuilt", hostPath) - needRebuild <- f + needRebuild <- fileEvent{ + HostPath: hostPath, + Service: name, + } default: return fmt.Errorf("watch action %q is not supported", trigger) } @@ -304,57 +296,25 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje } } -func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error { +func (s *composeService) makeSyncFn( + ctx context.Context, + project *types.Project, + needSync <-chan sync.PathMapping, +) func() error { + syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo()) + return func() error { for { select { case <-ctx.Done(): return nil - case opt := <-needSync: - service, err := project.GetService(opt.Service) + case pathMapping := <-needSync: + service, err := project.GetService(pathMapping.Service) if err != nil { return err } - scale := 1 - if service.Deploy != nil && service.Deploy.Replicas != nil { - scale = int(*service.Deploy.Replicas) - } - - if fi, statErr := os.Stat(opt.HostPath); statErr == nil { - if fi.IsDir() { - for i := 1; i <= scale; i++ { - _, err := s.Exec(ctx, project.Name, api.RunOptions{ - Service: opt.Service, - Command: []string{"mkdir", "-p", opt.ContainerPath}, - Index: i, - }) - if err != nil { - logrus.Warnf("failed to create %q from %s: %v", opt.ContainerPath, opt.Service, err) - } - } - fmt.Fprintf(s.stdinfo(), "%s created\n", opt.ContainerPath) - } else { - err := s.Copy(ctx, project.Name, api.CopyOptions{ - Source: opt.HostPath, - Destination: fmt.Sprintf("%s:%s", opt.Service, opt.ContainerPath), - }) - if err != nil { - return err - } - fmt.Fprintf(s.stdinfo(), "%s updated\n", opt.ContainerPath) - } - } else if errors.Is(statErr, fs.ErrNotExist) { - for i := 1; i <= scale; i++ { - _, err := s.Exec(ctx, project.Name, api.RunOptions{ - Service: opt.Service, - Command: []string{"rm", "-rf", opt.ContainerPath}, - Index: i, - }) - if err != nil { - logrus.Warnf("failed to delete %q from %s: %v", opt.ContainerPath, opt.Service, err) - } - } - fmt.Fprintf(s.stdinfo(), "%s deleted from service\n", opt.ContainerPath) + if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil { + return err } } } @@ -363,7 +323,7 @@ func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, type rebuildServices map[string]utils.Set[string] -func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) { +func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) { services := make(rebuildServices) t := clock.NewTimer(delay) defer t.Stop() diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index 05d5ec8f0c..3f359f7d41 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/docker/compose/v2/internal/sync" + "github.com/docker/cli/cli/command" "github.com/docker/compose/v2/pkg/watch" "github.com/jonboulle/clockwork" @@ -27,7 +29,7 @@ import ( ) func Test_debounce(t *testing.T) { - ch := make(chan fileMapping) + ch := make(chan fileEvent) var ( ran int got []string @@ -47,7 +49,7 @@ func Test_debounce(t *testing.T) { return nil }) for i := 0; i < 100; i++ { - ch <- fileMapping{Service: "test"} + ch <- fileEvent{Service: "test"} } assert.Equal(t, ran, 0) clock.Advance(quietPeriod) @@ -79,8 +81,8 @@ func (t testWatcher) Errors() chan error { } func Test_sync(t *testing.T) { - needSync := make(chan fileMapping) - needRebuild := make(chan fileMapping) + needSync := make(chan sync.PathMapping) + needRebuild := make(chan fileEvent) ctx, cancelFunc := context.WithCancel(context.TODO()) defer cancelFunc() @@ -119,7 +121,7 @@ func Test_sync(t *testing.T) { watcher.Events() <- watch.NewFileEvent("/src/changed") select { case actual := <-needSync: - assert.DeepEqual(t, fileMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual) + assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual) case <-time.After(100 * time.Millisecond): t.Error("timeout") }