diff --git a/Makefile b/Makefile index 9a38154127..8c4fe9f820 100644 --- a/Makefile +++ b/Makefile @@ -102,6 +102,7 @@ WITH_OPENCONTRAIL?=true WITH_LIBVIRT_GO?=true WITH_EBPF_DOCKER_BUILDER?=false WITH_VPP?=false +WITH_DOCKER_VPP?=false export PATH:=$(BUILD_TOOLS):$(PATH) @@ -203,6 +204,10 @@ ifeq ($(WITH_VPP), true) AGENT_TEST_EXTRA_PROBES+=vpp endif +ifeq ($(WITH_DOCKER_VPP), true) + BUILD_TAGS+=docker_vpp +endif + ifeq (${DEBUG}, true) GOFLAGS=-gcflags='-N -l' GO_BINDATA_FLAGS+=-debug diff --git a/agent/probes.go b/agent/probes.go index 46dee2a32d..e1fe1844d2 100644 --- a/agent/probes.go +++ b/agent/probes.go @@ -26,6 +26,8 @@ import ( "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/probe" "github.com/skydive-project/skydive/topology/probes/docker" + "github.com/skydive-project/skydive/topology/probes/docker/subprobes" + docker_vpp "github.com/skydive-project/skydive/topology/probes/docker/subprobes/vpp" "github.com/skydive-project/skydive/topology/probes/libvirt" "github.com/skydive-project/skydive/topology/probes/lldp" "github.com/skydive-project/skydive/topology/probes/lxd" @@ -86,7 +88,8 @@ func NewTopologyProbeBundleFromConfig(g *graph.Graph, hostNode *graph.Node) (*pr case "docker": dockerURL := config.GetString("agent.topology.docker.url") netnsRunPath := config.GetString("agent.topology.docker.netns.run_path") - dockerProbe, err := docker.NewProbe(nsProbe, dockerURL, netnsRunPath) + subprobes := dockerSubprobes(nsProbe) + dockerProbe, err := docker.NewProbe(nsProbe, dockerURL, netnsRunPath, subprobes) if err != nil { return nil, fmt.Errorf("Failed to initialize Docker probe: %s", err) } @@ -137,3 +140,14 @@ func NewTopologyProbeBundleFromConfig(g *graph.Graph, hostNode *graph.Node) (*pr return bundle, nil } + +// dockerSubprobes create all docker related subprobes +func dockerSubprobes(nsProbe *netns.Probe) []subprobes.Subprobe { + subprobes := make([]subprobes.Subprobe, 0) + if vpp, err := docker_vpp.NewSubprobe(nsProbe); err != nil { + logging.GetLogger().Warningf("VPP subprobe in docker probe will be disabled because its creation failed: %v", err) + } else { + subprobes = append(subprobes, vpp) + } + return subprobes +} diff --git a/config/config.go b/config/config.go index ad7787fc42..22776f79f3 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,8 @@ func init() { cfg.SetDefault("agent.topology.probes", []string{"ovsdb"}) cfg.SetDefault("agent.topology.docker.url", "unix:///var/run/docker.sock") cfg.SetDefault("agent.topology.docker.netns.run_path", "/var/run/docker/netns") + cfg.SetDefault("agent.topology.docker.vpp.cliconnect", "") + cfg.SetDefault("agent.topology.docker.vpp.probeinterval", 5) // interval is in seconds cfg.SetDefault("agent.topology.netlink.metrics_update", 30) cfg.SetDefault("agent.topology.netns.run_path", "/var/run/netns") cfg.SetDefault("agent.topology.neutron.domain_name", "Default") diff --git a/etc/skydive.yml.default b/etc/skydive.yml.default index c43d22797c..f6be63ab5d 100644 --- a/etc/skydive.yml.default +++ b/etc/skydive.yml.default @@ -211,6 +211,20 @@ agent: netns: # allow to specify where the docker probe is watching network namespaces # run_path: /var/run/docker/netns + vpp: + # configuration for vpp subprobe + cliconnect: + # Path to VPP CLI socket file or VPP CLI connection IP address with port. + # Default "" is using default VPP CLI socket file (/run/vpp/cli.sock). This + # setting is used for retrieving information from VPP in docker, i.e. to get + # VPP version, docker client executes in given container command + # "vppctl -s show version" + probeinterval: + # Interval (in seconds) between regular probing in all docker containers for + # VPP topology changes. Default value is 5. Minimal values is 1 (lower values + # will be changed to 1). The probing can get easily CPU-intensive when + # count of docker containers increases. You can mitigate CPU problems by setting + # longer probe intervals. netlink: # delay in seconds between two metric updates diff --git a/scripts/ci/jobs/jobs.yml b/scripts/ci/jobs/jobs.yml index 1087f2d609..de68e4c2f5 100644 --- a/scripts/ci/jobs/jobs.yml +++ b/scripts/ci/jobs/jobs.yml @@ -368,7 +368,7 @@ builders: - skydive-cleanup - skydive-test: - test: BACKEND=orientdb scripts/ci/run-functional-tests.sh + test: BACKEND=orientdb WITH_DOCKER_VPP=true scripts/ci/run-functional-tests.sh publishers: - junit: results: tests.xml diff --git a/scripts/ci/run-compile-tests.sh b/scripts/ci/run-compile-tests.sh index 53f332dd63..b41b5dca6e 100755 --- a/scripts/ci/run-compile-tests.sh +++ b/scripts/ci/run-compile-tests.sh @@ -27,7 +27,7 @@ make make test.functionals.compile TAGS=${TAGS} # Compile with all build options supported enabled -make WITH_DPDK=true WITH_EBPF=true WITH_VPP=true WITH_EBPF_DOCKER_BUILDER=true WITH_K8S=true WITH_ISTIO=true WITH_HELM=true VERBOSE=true +make WITH_DPDK=true WITH_EBPF=true WITH_VPP=true WITH_DOCKER_VPP=true WITH_EBPF_DOCKER_BUILDER=true WITH_K8S=true WITH_ISTIO=true WITH_HELM=true VERBOSE=true # Compile Skydive for Windows GOOS=windows GOARCH=amd64 govendor build github.com/skydive-project/skydive @@ -39,7 +39,7 @@ GOOS=darwin GOARCH=amd64 govendor build github.com/skydive-project/skydive make WITH_PROF=true VERBOSE=true # Compile all tests -make test.functionals.compile TAGS=${TAGS} WITH_NEUTRON=true WITH_SELENIUM=true WITH_CDD=true WITH_SCALE=true WITH_EBPF=true WITH_VPP=true WITH_K8S=true WITH_ISTIO=true WITH_HELM=true WITH_EBPF_DOCKER_BUILDER=true +make test.functionals.compile TAGS=${TAGS} WITH_NEUTRON=true WITH_SELENIUM=true WITH_CDD=true WITH_SCALE=true WITH_EBPF=true WITH_VPP=true WITH_DOCKER_VPP=true WITH_K8S=true WITH_ISTIO=true WITH_HELM=true WITH_EBPF_DOCKER_BUILDER=true # Compile static make static diff --git a/tests/docker_vpp_test.go b/tests/docker_vpp_test.go new file mode 100644 index 0000000000..5dd8fc738a --- /dev/null +++ b/tests/docker_vpp_test.go @@ -0,0 +1,145 @@ +// +build docker_vpp + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "fmt" + "github.com/skydive-project/skydive/gremlin" + "testing" +) + +const ( + dockerImageWithRunningVPP = "ligato/vpp-base:19.04" + vppWaitScript = "sh -c 'retry=%d;until docker exec %s vppctl sh version || [ $retry -eq 0 ]; do retry=$(( retry-1 ));sleep 0.5s;echo \"VPP not ready-retries left \"$retry;done'" +) + +func TestRunningVPPInDocker(t *testing.T) { + test := &Test{ + setupCmds: []Cmd{ + {fmt.Sprintf("docker run -d -t -i --name test-skydive-docker-running-vpp %s", dockerImageWithRunningVPP), false}, + {fmt.Sprintf(vppWaitScript, 10, "test-skydive-docker-running-vpp"), true}, + }, + + tearDownCmds: []Cmd{ + {"docker rm -f test-skydive-docker-running-vpp", false}, + }, + + mode: Replay, + + checks: []CheckFunction{func(c *CheckContext) error { + return assertOneEndNode(c, c.gremlin.V().Has("Type", "netns", "Manager", "docker"). + Out("Type", "vpp", "Manager", "docker")) + }}, + } + + RunTest(t, test) +} + +func TestDockerVPPConnectingToVeth(t *testing.T) { + test := &Test{ + setupCmds: []Cmd{ + {fmt.Sprintf("docker run -d -t -i --privileged --name test-skydive-docker-vpp-to-veth %s", dockerImageWithRunningVPP), false}, + {fmt.Sprintf(vppWaitScript, 40, "test-skydive-docker-vpp-to-veth"), true}, + {"docker exec test-skydive-docker-vpp-to-veth ip link add name veth-container type veth peer name veth-host", true}, // creating veth tunnel (that can be used to tunnel docker container and docker host) + {"docker exec test-skydive-docker-vpp-to-veth ip link set dev veth-container up", true}, + {"docker exec test-skydive-docker-vpp-to-veth ip link set dev veth-host up", true}, // no need for this test to actually push veth-host to network namespace of docker host OS + {"docker exec test-skydive-docker-vpp-to-veth vppctl create host-interface name veth-container", true}, // grabbing and using veth-container end of tunnel in VPP + {"docker exec test-skydive-docker-vpp-to-veth vppctl set int state host-veth-container up", true}, + }, + + tearDownCmds: []Cmd{ + {"docker rm -f test-skydive-docker-vpp-to-veth", false}, + }, + + mode: Replay, + + checks: []CheckFunction{func(c *CheckContext) error { + return assertOneEndNode(c, c.gremlin.V().Has("Type", "vpp", "Manager", "docker", "Container", "test-skydive-docker-vpp-to-veth"). + Out("Type", "veth", "Name", "veth-container")) + }}, + } + + RunTest(t, test) +} + +func TestTwoVPPsConnectedUsingMemifTunnel(t *testing.T) { + vpp1Container := "test-skydive-docker-vpp1-with-memif-tunnel" + vpp2Container := "test-skydive-docker-vpp2-with-memif-tunnel" + test := &Test{ + setupCmds: []Cmd{ + // prepare container-shared folder (docker would create it automatically, but creating it now and with user that is running test resolves permission problems in teardown) + {"mkdir /tmp/skydivetests-dockervpp-sockets", false}, + + // starting docker contrainers + {fmt.Sprintf("docker run -d -t -i -v /tmp/skydivetests-dockervpp-sockets/:/run/othersockets/ --name %s %s", vpp1Container, dockerImageWithRunningVPP), false}, + {fmt.Sprintf("docker run -d -t -i -v /tmp/skydivetests-dockervpp-sockets/:/run/othersockets/ --name %s %s", vpp2Container, dockerImageWithRunningVPP), false}, + + // waiting for VPPs to start inside containers + {fmt.Sprintf(vppWaitScript, 10, vpp1Container), true}, + {fmt.Sprintf(vppWaitScript, 10, vpp2Container), true}, + + // creating memif tunnel + {fmt.Sprintf("docker exec %s vppctl create memif socket id 1 filename /run/othersockets/another-memif.sock", vpp1Container), true}, + {fmt.Sprintf("docker exec %s vppctl create interface memif socket-id 1 id 0 master", vpp1Container), true}, + {fmt.Sprintf("docker exec %s vppctl set int state memif1/0 up", vpp1Container), true}, + {fmt.Sprintf("docker exec %s vppctl create memif socket id 1 filename /run/othersockets/another-memif.sock", vpp2Container), true}, + {fmt.Sprintf("docker exec %s vppctl create interface memif socket-id 1 id 0 slave", vpp2Container), true}, + {fmt.Sprintf("docker exec %s vppctl set int state memif1/0 up", vpp2Container), true}, + }, + + tearDownCmds: []Cmd{ + // removing memif socket file (it was created by VPP,but removing it from VPP doesn't remove the physical + // file->removing reference from VPPs and removing it on docker container level to prevent permission problems) + {fmt.Sprintf("docker exec %s vppctl delete interface memif memif1/0", vpp1Container), true}, + {fmt.Sprintf("docker exec %s vppctl delete interface memif memif1/0", vpp2Container), true}, + {fmt.Sprintf("docker exec %s vppctl delete memif socket id 1", vpp1Container), true}, + {fmt.Sprintf("docker exec %s vppctl delete memif socket id 1", vpp2Container), true}, + {fmt.Sprintf("docker exec %s rm -rf /run/othersockets/another-memif.sock", vpp1Container), true}, + + // removing docker containers + {fmt.Sprintf("docker rm -f %s", vpp1Container), false}, + {fmt.Sprintf("docker rm -f %s", vpp2Container), false}, + + // removing container-shared folder for memif socket file + {"rm -rf /tmp/skydivetests-dockervpp-sockets", true}, + }, + + mode: Replay, + + checks: []CheckFunction{func(c *CheckContext) error { + return assertOneEndNode(c, c.gremlin.V().Has("Type", "vpp", "Manager", "docker", "Container", vpp1Container). + Out("Type", "intf", "Name", "memif1/0", "Manager", "docker"). + Both("Type", "intf", "Name", "memif1/0", "Manager", "docker"). + In("Type", "vpp", "Manager", "docker", "Container", vpp2Container)) + }}, + } + + RunTest(t, test) +} + +func assertOneEndNode(c *CheckContext, queryString gremlin.QueryString) error { + nodes, err := c.gh.GetNodes(queryString) + if err != nil { + return err + } + + if len(nodes) != 1 { + return fmt.Errorf("expected 1 end node, got %+v", nodes) + } + + return nil +} diff --git a/topology/probes/docker/docker.go b/topology/probes/docker/docker.go index e5bbd03643..a9d1148d8c 100644 --- a/topology/probes/docker/docker.go +++ b/topology/probes/docker/docker.go @@ -36,6 +36,7 @@ import ( "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/logging" "github.com/skydive-project/skydive/topology" + "github.com/skydive-project/skydive/topology/probes/docker/subprobes" ns "github.com/skydive-project/skydive/topology/probes/netns" sversion "github.com/skydive-project/skydive/version" ) @@ -60,6 +61,7 @@ type Probe struct { wg sync.WaitGroup hostNs netns.NsHandle containerMap map[string]containerInfo + subprobes []subprobes.Subprobe } func (probe *Probe) containerNamespace(pid int) string { @@ -144,6 +146,17 @@ func (probe *Probe) registerContainer(id string) { Pid: info.State.Pid, Node: containerNode, } + + regData := &subprobes.ContainerRegistrationData{ + Info: info, + Node: containerNode, + NSRootID: n.ID, + } + for _, sp := range probe.subprobes { + if err := sp.RegisterContainer(regData); err != nil { + logging.GetLogger().Errorf("Subprobe %T failed by container registration: %v", sp, err) + } + } } func (probe *Probe) unregisterContainer(id string) { @@ -155,6 +168,15 @@ func (probe *Probe) unregisterContainer(id string) { return } + unregData := &subprobes.ContainerUnregistrationData{ + Node: infos.Node, + } + for _, sp := range probe.subprobes { + if err := sp.UnregisterContainer(unregData); err != nil { + logging.GetLogger().Errorf("Subprobe %T failed by container unregistration: %v", sp, err) + } + } + probe.Graph.Lock() if err := probe.Graph.DelNode(infos.Node); err != nil { probe.Graph.Unlock() @@ -269,6 +291,10 @@ func (probe *Probe) Start() { probe.wg.Wait() } }() + + for _, sp := range probe.subprobes { + sp.Start() + } } // Stop the probe @@ -281,17 +307,21 @@ func (probe *Probe) Stop() { probe.cancel() probe.wg.Wait() } + for _, sp := range probe.subprobes { + sp.Stop() + } atomic.StoreInt64(&probe.state, common.StoppedState) } // NewProbe creates a new topology Docker probe -func NewProbe(nsProbe *ns.Probe, dockerURL, netnsRunPath string) (*Probe, error) { +func NewProbe(nsProbe *ns.Probe, dockerURL, netnsRunPath string, subprobes []subprobes.Subprobe) (*Probe, error) { probe := &Probe{ Probe: nsProbe, url: dockerURL, containerMap: make(map[string]containerInfo), state: common.StoppedState, + subprobes: subprobes, } if netnsRunPath != "" { diff --git a/topology/probes/docker/no_docker.go b/topology/probes/docker/no_docker.go index b3e5ca6e0e..8ad7462e57 100644 --- a/topology/probes/docker/no_docker.go +++ b/topology/probes/docker/no_docker.go @@ -21,6 +21,7 @@ package docker import ( "github.com/skydive-project/skydive/common" + "github.com/skydive-project/skydive/topology/probes/docker/subprobes" ns "github.com/skydive-project/skydive/topology/probes/netns" ) @@ -37,6 +38,6 @@ func (probe *Probe) Stop() { } // NewProbe creates a new topology Docker probe -func NewProbe(nsProbe *ns.Probe, dockerURL, netnsRunPath string) (*Probe, error) { +func NewProbe(nsProbe *ns.Probe, dockerURL, netnsRunPath string, subprobes []subprobes.Subprobe) (*Probe, error) { return nil, common.ErrNotImplemented } diff --git a/topology/probes/docker/subprobes/api.go b/topology/probes/docker/subprobes/api.go new file mode 100644 index 0000000000..01e939cfa4 --- /dev/null +++ b/topology/probes/docker/subprobes/api.go @@ -0,0 +1,50 @@ +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package subprobes contains API for probes extending topology of docker probe. These probes are handled by docker probe, hence called subprobes. +package subprobes + +import ( + "github.com/docker/docker/api/types" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/probe" +) + +// Subprobe is probe attached to docker probe that extends docker graph with additional nodes/edges/metadata. +// The lifecycle of subprobe is handled by docker probe. The docker probe provides to subprobes additional +// information/events so that subprobes can further use them (i.e. to extend docker container graph) +type Subprobe interface { + probe.Probe + + // RegisterContainer is called by docker probe to notify subprobe about container addition detection (registration) + RegisterContainer(*ContainerRegistrationData) error + // UnregisterContainer is called by docker probe to notify subprobe about container removal detection (unregistration) + UnregisterContainer(*ContainerUnregistrationData) error +} + +// ContainerRegistrationData is data holder for passing information about container addition detection from docker probe to its subprobes +type ContainerRegistrationData struct { + // Info is container information retrieved by container inspect + Info types.ContainerJSON + // Node is graph node of container + Node *graph.Node + // NSRootID is graph ID of container namespace root in graph + NSRootID graph.Identifier +} + +// ContainerUnregistrationData is data holder for passing information about container removal detection from docker probe to its subprobes +type ContainerUnregistrationData struct { + // Node is graph node of container + Node *graph.Node +} diff --git a/topology/probes/docker/subprobes/vpp/docker.go b/topology/probes/docker/subprobes/vpp/docker.go new file mode 100644 index 0000000000..b44504e549 --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/docker.go @@ -0,0 +1,124 @@ +// +build docker_vpp,linux + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vpp + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/skydive-project/skydive/logging" +) + +// ClientAPIVersion Client API version used +const ClientAPIVersion = "1.18" + +// newDockerClient creates new Docker API client +func newDockerClient() (*client.Client, error) { + dockerURL := "unix:///var/run/docker.sock" + defaultHeaders := map[string]string{"User-Agent": "skydive-test"} + dockerClient, err := client.NewClient(dockerURL, ClientAPIVersion, nil, defaultHeaders) + if err != nil { + return nil, fmt.Errorf("failed to create client to Docker daemon: %s", err) + } + if _, err := dockerClient.ServerVersion(context.Background()); err != nil { + return nil, fmt.Errorf("failed to connect to Docker daemon: %s", err) + } + return dockerClient, nil +} + +// dockerExec performs docker API equivalent of "docker exec" CLI command. Exec action is performed on container +// with execution command split to base command and parameters in parameter. +func dockerExec(containerName string, cmd []string) (string, error) { + return dockerExecTimeouted(containerName, cmd, 1000000*time.Hour) +} + +// dockerExecTimeouted does the same as dockerExec function, but the docker exec run can be timeouted after timeout . +func dockerExecTimeouted(containerName string, cmd []string, timeout time.Duration) (string, error) { + // get docker client + dockerClient, err := newDockerClient() + if err != nil { + return "", fmt.Errorf("can't create docker client due to: %v", err) + } + defer func() { + if closeErr := dockerClient.Close(); closeErr != nil { + logging.GetLogger().Errorf("can't close docker client due to: %v", closeErr) + } + }() + + // create exec instance + execConfig := types.ExecConfig{ + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + Tty: true, + Cmd: cmd, + } + execInstance, err := dockerClient.ContainerExecCreate(context.Background(), containerName, execConfig) + if err != nil { + return "", fmt.Errorf("can't create exec instance for command %v in container %v due to: %v", cmd, containerName, err) + } + + // attach to created exec instance to be able to read output of started exec instance + att, err := dockerClient.ContainerExecAttach(context.Background(), execInstance.ID, execConfig) + if err != nil { + return "", fmt.Errorf("can't attach to created exec instance (command %v) in container %v due to: %v", cmd, containerName, err) + } + defer att.Close() + execStartConfing := types.ExecStartCheck{ + Detach: false, + Tty: false, + } + + // starting execution of exec instance + err = dockerClient.ContainerExecStart(context.Background(), execInstance.ID, execStartConfing) + if err != nil { + return "", fmt.Errorf("can't start created exec instance (command %v) in container %v due to: %v", cmd, containerName, err) + } + + // waiting until execution finish + timeoutChan := time.After(timeout) + var execInfo types.ContainerExecInspect + for { + execInfo, err = dockerClient.ContainerExecInspect(context.Background(), execInstance.ID) + if err != nil { + logging.GetLogger().Debugf("can't check ending of exec instance(command %v) due to: %v", cmd, err) + } + if !execInfo.Running || len(timeoutChan) > 0 { + break + } + time.Sleep(50 * time.Millisecond) + } + + // getting output of finished exec instance + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(att.Reader) + if err != nil { + return "", fmt.Errorf("can't read output of started exec instance(command %v) in container %v due to: %v", cmd, containerName, err) + } + content := buf.String() + + // checking remote command failure + if execInfo.ExitCode != 0 { + return "", fmt.Errorf("exit code %v", execInfo.ExitCode) + } + + return content, nil +} diff --git a/topology/probes/docker/subprobes/vpp/inforetriever.go b/topology/probes/docker/subprobes/vpp/inforetriever.go new file mode 100644 index 0000000000..c9ab729751 --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/inforetriever.go @@ -0,0 +1,155 @@ +// +build docker_vpp,linux + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vpp + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/skydive-project/skydive/config" + "github.com/skydive-project/skydive/logging" +) + +const ( + vppCLIShowVersion = "show version" + vppCLIShowHardware = "show hardware" + vppCLIShowMemif = "show memif" + vppInfoSeparator = "###" +) + +// dockerDataRetrieval retrieves all needed data about VPPs in all containers. Data are returned in map, where +// map[container name][vpp CLI command] = vpp CLI command output. Reason for this method is to optimize data +// retrieval (solving retrieving in parallel and with minimal docker exec calls due to its massive overhead when +// container count rises) +func (p *Subprobe) dockerDataRetrieval(containerNames map[string]*containerInfo) map[string]map[string]string { + type dockerExecResult struct { + containerName string + vppCLICommand string + output string + } + dataChan := make(chan dockerExecResult, 100) + + // launch goroutine that merges results from all worker goroutines + dockerExecResults := make(map[string]map[string]string) // [containerName][vpp CLI command] = vpp CLI command output + ctx, cancelMerger := context.WithCancel(context.Background()) + var mergerWG sync.WaitGroup + mergerWG.Add(1) + go func(results *map[string]map[string]string) { + defer mergerWG.Done() + addResult := func(data dockerExecResult) { + if dockerExecResults[data.containerName] == nil { + dockerExecResults[data.containerName] = make(map[string]string) + } + dockerExecResults[data.containerName][data.vppCLICommand] = data.output + } + for { + select { + case data := <-dataChan: + addResult(data) + case <-ctx.Done(): + for i := 0; i < len(dataChan); i++ { + addResult(<-dataChan) + } + return + } + } + }(&dockerExecResults) + + // launching parallel-working workers that retrieve all data from all VPPs in all docker containers + var workersWG sync.WaitGroup + for containerName := range containerNames { + workersWG.Add(1) + go func(containerName string) { + defer workersWG.Done() + // getting all VPP Info in one docker exec call, split it and forward it to merger goroutine + // (the big bottleneck is the overhead in docker exec call (cmd executed inside container is relatively time cheap) -> minimizing docker exec calls) + // NOTE: don't know why, but go docker client is 3 times slower as Linux cmd line "docker exec" command (tested parallel execution for 11 containers), + // but we can't use direct Linux cmd line, because Skydive agent is running also in container -> can't get out of it (this is totally against container + // principles->hard/impossible to do) + vppInfoStr, err := dockerExec(containerName, p.vppScriptCmd()) + if err != nil { + logging.GetLogger().Debugf("failed to retrieve vpp information from container %v due to: %v\n", containerName, err) + } else { // running VPP detected + logging.GetLogger().Debugf("VPP Probe: container %v, running VPP detected\n", containerName) + + // extract each data item from string and send it to merger goroutine + infoOrder := []string{vppCLIShowVersion, vppCLIShowHardware, vppCLIShowMemif} + infoData := strings.Split(vppInfoStr, vppInfoSeparator) + if len(infoData) != len(infoOrder) { + logging.GetLogger().Errorf("Count of data items retrieved from container %v doesn't correspond to expected "+ + "data item count (expected=%v, got=%v, full data string=%v)", containerName, len(infoOrder), len(infoData), vppInfoStr) + return + } + for i, data := range infoData { + if len(strings.TrimPrefix(strings.TrimSpace(data), "\n")) != 0 { // only relevant data are used (empty spaced, 2 lines empty spaced data are ignored) + // send docker exec result to merger goroutine + dataChan <- dockerExecResult{ + containerName: containerName, + vppCLICommand: infoOrder[i], + output: data, + } + } + } + } + }(containerName) + } + + workersWG.Wait() // wait for all docker exec commands to finish + cancelMerger() // signal for merger to process what is in channel and stop + mergerWG.Wait() // wait for merging of all results from docker exec commands + + return dockerExecResults +} + +// vppScriptCmd creates shell script to extract all VPP information from locally installed VPP (script should run inside container where VPP is installed) +func (p *Subprobe) vppScriptCmd() []string { + script := ` +version=` + "`" + p.vppCommandFromOSLevel(vppCLIShowVersion) + "`" + ` +retVal=$? +if [ $retVal -eq 0 ]; then + echo "$version" + echo "` + vppInfoSeparator + `" + hardware=` + "`" + p.vppCommandFromOSLevel(vppCLIShowHardware) + "`" + ` + hwretVal=$? + if [ $hwretVal -eq 0 ]; then + echo "$hardware" + fi + echo "` + vppInfoSeparator + `" + memif=` + "`" + p.vppCommandFromOSLevel(vppCLIShowMemif) + "`" + ` + miretVal=$? + if [ $miretVal -eq 0 ]; then + echo "$memif" + fi +else + echo "` + vppInfoSeparator + `" + echo "` + vppInfoSeparator + `" +fi +return $retVal +` + return []string{"sh", "-c", script} +} + +// vppCommandFromOSLevel transforms vpp command for VPP CLI to vpp command for OS shell command in OS where is VPP running as process. +func (p *Subprobe) vppCommandFromOSLevel(cliCommand string) string { + socket := config.GetString("agent.topology.docker.vpp.cliconnect") + if socket == "" { // using default VPP CLI socket (/run/vpp/cli.sock) + return fmt.Sprintf("vppctl %s", cliCommand) + } + return fmt.Sprintf("vppctl -s %s %s", socket, cliCommand) +} diff --git a/topology/probes/docker/subprobes/vpp/internaldata.go b/topology/probes/docker/subprobes/vpp/internaldata.go new file mode 100644 index 0000000000..1fcf34624b --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/internaldata.go @@ -0,0 +1,87 @@ +// +build docker_vpp,linux + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vpp + +import ( + "fmt" + "strings" + + "github.com/docker/docker/api/types" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/topology/probes/docker" +) + +// vppData is container for date extracted from VPP using CLI (or binary API) +type vppData struct { + version string // CLI: show version + linkedHardware map[string]struct{} // CLI: show hardware (provided HARDWARE column) + memifToSocketFile map[string]string // map[memif name]=host OS full path to memif socket, info taken from docker container info + CLI: show hardware + CLI: show memif +} + +// change types for containerListChange struct +const ( + addContainer = iota + removeContainer +) + +// containerListChange is message struct informing about change in running containers list. This message is created from +// information provided by hooked docker probe events. +type containerListChange struct { + containerInfo + changeType int + containerName string +} + +// containerInfo represents all info retrieved in time for given docker container +type containerInfo struct { + containerDockerInfo + containerGraphInfo +} + +// containerDockerInfo is docker-related info retrieved in time for given docker container +type containerDockerInfo struct { + volumes map[string]string // host path to container path +} + +// containerGraphInfo is UI graph-related info retrieved in time for given docker container +type containerGraphInfo struct { + containerNodeID graph.Identifier + nsRootNodeID graph.Identifier + vppNodeID graph.Identifier + linkedHardwareNodesToEdges map[graph.Identifier]graph.Identifier // last detected (and projected to graph) edges from VPP to hardware interfaces, map[hardware interface node id] = edge id + memifNametoNodeID map[string]graph.Identifier // last detected (and projected to graph) memifs, map[memif name from vpp] = memif node id in graph + globalMemifEdges map[graph.Identifier]map[graph.Identifier]graph.Identifier // last detected (and projected to graph) memif edges for whole graph and not only for container graph, map[first memif node id]map[second memif node id] = id of edge connecting that 2 memifs +} + +// volumes extracts volume information from types.ContainerJSON +func (p *Subprobe) volumes(info *types.ContainerJSON) map[string]string { + volumes := make(map[string]string) + for _, mountCfg := range info.HostConfig.Binds { + mountPair := strings.Split(mountCfg, ":") + volumes[mountPair[0]] = mountPair[1] + } + return volumes +} + +// containerName extracts container name from metadata of container root graph node +func (p *Subprobe) containerName(node *graph.Node) (string, error) { + dockerInfo, ok := node.Metadata["Docker"].(docker.Metadata) + if !ok { + return "", fmt.Errorf("no Docker data found") + } + return dockerInfo.ContainerName, nil +} diff --git a/topology/probes/docker/subprobes/vpp/memif.go b/topology/probes/docker/subprobes/vpp/memif.go new file mode 100644 index 0000000000..11ff4bb9f8 --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/memif.go @@ -0,0 +1,298 @@ +// +build docker_vpp,linux + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vpp + +import ( + "reflect" + "strings" + + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/logging" + "github.com/skydive-project/skydive/topology" +) + +const memifTunnelType = "memif" + +// detectMemifs detects all memifs and returns whether memifs changed in compare to previously detected memifs. +func (p *Subprobe) detectMemifs(vpp *vppData, containerName string, containerInfo *containerInfo, dockerExecResults map[string]map[string]string) bool { + infoStr, exists := dockerExecResults[containerName][vppCLIShowMemif] + if !exists { + logging.GetLogger().Errorf("can't get output from %v, skipping memif detection", vppCLIShowMemif) + return false + } + + // retrieving mapping between memif and path to its socket (path on host OS) + socketIDToPath := make(map[string]string) + socketsPart := strings.Split(infoStr, "interface")[0] + socketsDataParts := strings.Split(socketsPart, "\n")[2:] + for _, line := range socketsDataParts { + trimmed := strings.TrimSpace(line) + if len(trimmed) > 0 { + data := strings.Split(trimmed, " ") + socketIDToPath[data[0]] = data[len(data)-1] + } + } + memifToSocketFile := make(map[string]string) + for hw := range vpp.linkedHardware { + if strings.HasPrefix(hw, "memif") { + ids := strings.Split(strings.TrimPrefix(hw, "memif"), "/") + if len(ids) != 2 { + logging.GetLogger().Errorf("unexpected memif name: %v, expected memif[socket-id]/[id]", hw) + return false + } + vppSocketPath := socketIDToPath[ids[0]] + for hostPath, vppPath := range containerInfo.volumes { + if strings.HasPrefix(vppSocketPath, vppPath) { + memifToSocketFile[hw] = hostPath + strings.TrimPrefix(vppSocketPath, vppPath) + continue + } + } + } + } + if !reflect.DeepEqual(vpp.memifToSocketFile, memifToSocketFile) { + vpp.memifToSocketFile = memifToSocketFile + return true + } + return false +} + +func (p *Subprobe) removeCachedMemifEdgesForContainer(graphInfo *containerGraphInfo) { + // creating helping data structure to ask directly if memifNodeID should be removed + removalNodeIDs := make(map[graph.Identifier]struct{}) + for _, memifNodeID := range graphInfo.memifNametoNodeID { + removalNodeIDs[memifNodeID] = struct{}{} + } + + // update global values (all containerInfo.containerGraphInfo.globalMemifEdges references the same global instance -> need to update it only once using any reference to it) + for node1ID, inner := range graphInfo.globalMemifEdges { + if _, contains := removalNodeIDs[node1ID]; contains { + delete(graphInfo.globalMemifEdges, node1ID) + continue + } + for node2ID := range inner { + if _, contains := removalNodeIDs[node2ID]; contains { + delete(graphInfo.globalMemifEdges[node1ID], node2ID) + } + } + } +} + +func (p *Subprobe) deleteAllMemifsForVPPFromGraph(containerName string, containerGraphInfo *containerGraphInfo) { + for memif, nodeID := range containerGraphInfo.memifNametoNodeID { + p.deleteMemifFromGraph(nodeID, memif, containerName) + } + containerGraphInfo.memifNametoNodeID = make(map[string]graph.Identifier) +} + +// updateMemifs updates memif and related edges in graph. This method should be called only if memif change is detected. +func (p *Subprobe) updateMemifs(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) { + p.addMissingMemifs(containerNames, detectedVPPs) + memifEdges := p.computeMemifEdgesGraphInfo(containerNames, detectedVPPs) + p.addMemIfTunnelEdgesToGraph(memifEdges) + p.saveMemifEdgesGraphInfo(containerNames, memifEdges) + p.removeObsoleteMemifs(containerNames, detectedVPPs) +} + +// addMissingMemifs adds memifs detected in VPP that are not in graph +func (p *Subprobe) addMissingMemifs(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) { + for containerName, containerInfo := range containerNames { + if vpp, existsVpp := detectedVPPs[containerName]; existsVpp { + for memif := range vpp.memifToSocketFile { + if containerInfo.containerGraphInfo.memifNametoNodeID == nil { + containerInfo.containerGraphInfo.memifNametoNodeID = make(map[string]graph.Identifier) + } + if _, contains := containerInfo.containerGraphInfo.memifNametoNodeID[memif]; !contains { + p.addMemifToGraph(memif, &containerInfo.containerGraphInfo, containerName) + } + } + } + } +} + +// computeMemifEdgesGraphInfo computes new edges for memif tunnels based detected vpp info. Edge Ids are of course not +// computed here (they are created by edge creation), but places in map are created here. In edge creation, the ids are +// filled at right places in data structure computed here. +func (p *Subprobe) computeMemifEdgesGraphInfo(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) map[graph.Identifier]map[graph.Identifier]graph.Identifier { + graphInfo := p.firstContainerGraphInfo(containerNames) + if graphInfo == nil { + logging.GetLogger().Errorf("can't get first container graph info -> can't get global memif edges info->skipping memif/memif edges update") + } + memifEdges := make(map[graph.Identifier]map[graph.Identifier]graph.Identifier) // map[first memif node id]map[second memif node id] = id of edge connecting that 2 memifs + socketFileToMemifs := p.computeSocketFileMemifMapping(containerNames, detectedVPPs) + + // computing all edges and copying edge id already used in graph if edge is already in graph (this can be also done by make 5 for cycles, but computation complexity explodes with higher count of containers/memifs) + for containerName, containerInfo := range containerNames { + if vpp, existsVpp := detectedVPPs[containerName]; existsVpp { + for memif, hostSocketFile := range vpp.memifToSocketFile { + memif1NodeID := containerInfo.containerGraphInfo.memifNametoNodeID[memif] + for _, memif2NodeID := range p.otherEndsOfMemifTunnel(containerName, memif, hostSocketFile, socketFileToMemifs) { + if memif1NodeID < memif2NodeID { // filtering duplicate edges (1 edge will be detected 2 times -> using only 1 detection by introducing artificial condition that is true only for 1 edge from 2 detections) + if memifEdges[memif1NodeID] == nil { + memifEdges[memif1NodeID] = make(map[graph.Identifier]graph.Identifier) + } + memifEdges[memif1NodeID][memif2NodeID] = graphInfo.globalMemifEdges[memif1NodeID][memif2NodeID] //if not in graphInfo.globalMemifEdges, default value will be filled ("") and later edge id will created and filed here (new edges detection condition: memifEdges[x][y] == "") + if graphInfo.globalMemifEdges[memif1NodeID][memif2NodeID] != "" { + graphInfo.globalMemifEdges[memif1NodeID][memif2NodeID] = "" // need for further finding edges for removal + } + } + } + } + } + } + return memifEdges +} + +func (p *Subprobe) firstContainerGraphInfo(containerNames map[string]*containerInfo) *containerGraphInfo { + for _, contrainerInfo := range containerNames { + return &contrainerInfo.containerGraphInfo + } + return nil +} + +// otherEndsOfMemifTunnel computes nodeIDs of memif node that represents end point of tunnels that start with given node (, ). +func (p *Subprobe) otherEndsOfMemifTunnel(oneEndContainerName string, oneEndMemif string, hostSocketFile string, + socketFileToMemifs map[string][]string) []graph.Identifier { + ends := make([]graph.Identifier, 0) + for _, pairMemIfStr := range socketFileToMemifs[hostSocketFile] { + pairMemIf := strings.Split(pairMemIfStr, "#") + if !(pairMemIf[0] == oneEndContainerName && pairMemIf[1] == oneEndMemif) { // not current memif => other memif connected with current memif + ends = append(ends, graph.Identifier(pairMemIf[2])) + } + } + return ends +} + +// computeSocketFileMemifMapping computes mapping that maps host socket file to memif identification tuple (containerName, memif, memifNodeID) +func (p *Subprobe) computeSocketFileMemifMapping(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) map[string][]string { + // creating help mapping for connecting memifs (reverse map of remembered data) + socketFileToMemifs := make(map[string][]string) + for containerName, containerInfo := range containerNames { + if vpp, existsVpp := detectedVPPs[containerName]; existsVpp { + for memif, hostSocketFile := range vpp.memifToSocketFile { + memifNodeID := string(containerInfo.containerGraphInfo.memifNametoNodeID[memif]) // graph.Identifier = string + socketFileToMemifs[hostSocketFile] = append(socketFileToMemifs[hostSocketFile], containerName+"#"+memif+"#"+memifNodeID) + } + } + } + return socketFileToMemifs +} + +func (p *Subprobe) addMemIfTunnelEdgesToGraph(memifEdges map[graph.Identifier]map[graph.Identifier]graph.Identifier) { + p.graph.Lock() + defer p.graph.Unlock() + + for memif1NodeID, inner := range memifEdges { + for memif2NodeID, edgeID := range inner { + if edgeID == "" { + // get nodes + memif1Node := p.graph.GetNode(memif1NodeID) + if memif1Node == nil { + logging.GetLogger().Errorf("can't find graph node (first memif node) with id %v", memif1Node) + } + memif2Node := p.graph.GetNode(memif2NodeID) + if memif1Node == nil { + logging.GetLogger().Errorf("can't find graph node (second memif node) with id %v", memif2Node) + } + + memifEdges[memif1NodeID][memif2NodeID] = graph.GenID() + edge, err := p.graph.NewEdge(memifEdges[memif1NodeID][memif2NodeID], memif1Node, memif2Node, p.memIfTunnelEdgeMetadata()) + if err != nil { + logging.GetLogger().Errorf("can't add graph edge between 2 memif nodes (%v <-> %v) due to: %v", memif1Node.ID, memif2Node.ID, err) + } + p.graph.AddEdge(edge) + } + } + } +} + +func (p *Subprobe) memIfTunnelEdgeMetadata() graph.Metadata { + return graph.Metadata{"RelationType": topology.Layer2Link, "Type": memifTunnelType} +} + +func (p *Subprobe) deleteMemifFromGraph(nodeID graph.Identifier, memif string, containerName string) { + p.graph.Lock() + defer p.graph.Unlock() + + memifNode := p.graph.GetNode(nodeID) + if memifNode == nil { + logging.GetLogger().Errorf("can't delete memif due to: can't find previously created memif (%v) node with id %v (container=%v)", memif, memifNode, containerName) + return + } + + if err := p.graph.DelNode(memifNode); err != nil { + logging.GetLogger().Errorf("can't delete memif node with id %v due to: %v", nodeID, err) + } +} + +func (p *Subprobe) addMemifToGraph(memif string, containerGraphInfo *containerGraphInfo, containerName string) { + p.graph.Lock() + defer p.graph.Unlock() + + nsRootNode, err := p.nsRootNode(containerGraphInfo, containerName) + if err != nil { + logging.GetLogger().Errorf("can't add memif %v for vpp, because we can't find graph node (nsRoot node) with id %v due to: %v", memif, containerGraphInfo.nsRootNodeID, err) + return + } + + containerGraphInfo.memifNametoNodeID[memif] = graph.GenID() + metadata := graph.Metadata{ + "Type": "intf", + "Manager": "docker", + "Name": memif, + } + memifNode, err := p.graph.NewNode(containerGraphInfo.memifNametoNodeID[memif], metadata) + if err != nil { + logging.GetLogger().Errorf("can't create memif (metadata=%+v) due to: %v", metadata, err) + return + } + + topology.AddOwnershipLink(p.graph, nsRootNode, memifNode, nil) + + vppNode := p.graph.GetNode(containerGraphInfo.vppNodeID) + if vppNode == nil { + logging.GetLogger().Errorf("can't link memif %v with vpp node, because we can't find graph node (vpp node) with id %v", memif, containerGraphInfo.vppNodeID) + return + } + edge, err := p.graph.NewEdge(graph.GenID(), vppNode, memifNode, topology.Layer2Metadata()) // graph.Metadata{"RelationType": topology.Layer2Link, "Type": "veth"} + if err != nil { + logging.GetLogger().Errorf("can't link memif %v with vpp node %v due to: %v", memif, containerGraphInfo.vppNodeID, err) + return + } + p.graph.AddEdge(edge) +} + +// saveMemifEdgesGraphInfo sets global memif edge information. It must set it to all containers so it wont be lost when +// container is removed. Last container removal will remove also this information. +func (p *Subprobe) saveMemifEdgesGraphInfo(containerNames map[string]*containerInfo, memifEdges map[graph.Identifier]map[graph.Identifier]graph.Identifier) { + for _, containerInfo := range containerNames { + containerInfo.containerGraphInfo.globalMemifEdges = memifEdges + } +} + +// removeObsoleteMemifs removes memif nodes that are in graph, but not currently detected in vpp +func (p *Subprobe) removeObsoleteMemifs(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) { + for containerName, containerInfo := range containerNames { + if vpp, existsVpp := detectedVPPs[containerName]; existsVpp { + for memif, nodeID := range containerInfo.containerGraphInfo.memifNametoNodeID { + if _, contains := vpp.memifToSocketFile[memif]; !contains { + p.deleteMemifFromGraph(nodeID, memif, containerName) + delete(containerInfo.containerGraphInfo.memifNametoNodeID, memif) + } + } + } + } +} diff --git a/topology/probes/docker/subprobes/vpp/no_vpp.go b/topology/probes/docker/subprobes/vpp/no_vpp.go new file mode 100644 index 0000000000..bd935f073c --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/no_vpp.go @@ -0,0 +1,52 @@ +// +build !linux !docker_vpp + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vpp + +import ( + "github.com/skydive-project/skydive/logging" + "github.com/skydive-project/skydive/topology/probes/docker/subprobes" + "github.com/skydive-project/skydive/topology/probes/netns" +) + +// DummySubprobe is VPP subprobe that does nothing. It is used in cases when build tags prohibit to use real VPP subprobe +type DummySubprobe struct{} + +// NewSubprobe creates a new topology Docker subprobe +func NewSubprobe(dockerNSProbe *netns.Probe) (*DummySubprobe, error) { + logging.GetLogger().Debug("Dummy VPP probe creating...") + return &DummySubprobe{}, nil +} + +// Start the dummy probe +func (p *DummySubprobe) Start() { + logging.GetLogger().Debug("Dummy VPP subprobe starting...") +} + +// Stop the dummy probe +func (p *DummySubprobe) Stop() { + logging.GetLogger().Debug("Dummy VPP subprobe stopping...") +} + +// RegisterContainer is called by docker probe to notify subprobe about container addition detection (registration) +func (p *DummySubprobe) RegisterContainer(data *subprobes.ContainerRegistrationData) error { + return nil // Dummy implementation +} + +// UnregisterContainer is called by docker probe to notify subprobe about container removal detection (unregistration) +func (p *DummySubprobe) UnregisterContainer(data *subprobes.ContainerUnregistrationData) error { + return nil // Dummy implementation +} diff --git a/topology/probes/docker/subprobes/vpp/vpp.go b/topology/probes/docker/subprobes/vpp/vpp.go new file mode 100644 index 0000000000..9504a46416 --- /dev/null +++ b/topology/probes/docker/subprobes/vpp/vpp.go @@ -0,0 +1,395 @@ +// +build docker_vpp,linux + +// Copyright (c) 2019 PANTHEON.tech s.r.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package vpp contains docker subprobe that extends docker topology graph with information about running VPP +// (https://wiki.fd.io/view/VPP) and related interfaces (memif - https://docs.fd.io/vpp/17.10/libmemif_doc.html, veth) +package vpp + +import ( + "context" + "fmt" + "reflect" + "regexp" + "strings" + "sync" + "time" + + "github.com/skydive-project/skydive/config" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/logging" + "github.com/skydive-project/skydive/topology" + "github.com/skydive-project/skydive/topology/probes/docker/subprobes" + "github.com/skydive-project/skydive/topology/probes/netns" +) + +// Subprobe describes a VPP topology graph enhancements +type Subprobe struct { + containerListChan chan containerListChange + graph *graph.Graph + wg sync.WaitGroup + cancel context.CancelFunc +} + +// NewSubprobe creates a new topology Docker subprobe +func NewSubprobe(dockerNSProbe *netns.Probe) (*Subprobe, error) { + logging.GetLogger().Debug("VPP probe creating...") + dockerProbe := &Subprobe{ + containerListChan: make(chan containerListChange, 100), + graph: dockerNSProbe.Graph, + } + + return dockerProbe, nil +} + +// Start the probe (this is only relevant only for checking VPP that is installed native (no docker, but directly in host linux)) +func (p *Subprobe) Start() { + logging.GetLogger().Debug("VPP subprobe starting...") + var ctx context.Context + ctx, p.cancel = context.WithCancel(context.Background()) + p.wg.Add(1) + go p.handleTopologyUpdates(ctx) +} + +// Stop the probe (this is only relevant only for checking VPP that is installed native (no docker, but directly in host linux) +func (p *Subprobe) Stop() { + logging.GetLogger().Debug("VPP subprobe stopping...") + p.cancel() + p.wg.Wait() +} + +// RegisterContainer is called by docker probe to notify subprobe about container addition detection (registration) +func (p *Subprobe) RegisterContainer(data *subprobes.ContainerRegistrationData) error { + logging.GetLogger().Debugf("VPP subprobe catched registration of docker container: %+v ", data) + volumes := p.volumes(&data.Info) + containerName, err := p.containerName(data.Node) + if err != nil { + return fmt.Errorf("can't extract container name for metadata of graph node %v due to: %v", data.Node, err) + } + + p.containerListChan <- containerListChange{ + changeType: addContainer, + containerName: containerName, + containerInfo: containerInfo{ + containerDockerInfo: containerDockerInfo{ + volumes: volumes, + }, + containerGraphInfo: containerGraphInfo{ + containerNodeID: data.Node.ID, + nsRootNodeID: data.NSRootID, + }, + }, + } + return nil +} + +// UnregisterContainer is called by docker probe to notify subprobe about container removal detection (unregistration) +func (p *Subprobe) UnregisterContainer(data *subprobes.ContainerUnregistrationData) error { + logging.GetLogger().Debugf("VPP subprobe catched unregistration of docker container: %+v ", data) + containerName, err := p.containerName(data.Node) + if err != nil { + return fmt.Errorf("can't extract container name for metadata of graph node %v", data.Node) + } + + p.containerListChan <- containerListChange{ + changeType: removeContainer, + containerName: containerName, + } + return nil +} + +// handleTopologyUpdates handles topology changes related to VPP and propagates them to UI graph +func (p *Subprobe) handleTopologyUpdates(ctx context.Context) { + defer p.wg.Done() + containerNames := make(map[string]*containerInfo) + detectedVPPs := make(map[string]*vppData) // detected VPPs identified by container name (simplification for now: container can have max 1 VPP) + ticker := time.NewTicker(p.probeInterval()) + for { + select { + case msg := <-p.containerListChan: //topology changes catched from docker probe + switch msg.changeType { + case addContainer: + containerNames[msg.containerName] = &msg.containerInfo + p.updateState(containerNames, detectedVPPs, nil) + case removeContainer: + cgi := containerNames[msg.containerName] + delete(containerNames, msg.containerName) + p.updateState(containerNames, detectedVPPs, &cgi.containerGraphInfo) + } + case <-ticker.C: // regular interval updates + p.updateState(containerNames, detectedVPPs, nil) + case <-ctx.Done(): + ticker.Stop() + return + } + } +} + +// probeInterval computes interval between probe checks based on configuration +func (p *Subprobe) probeInterval() time.Duration { + interval := config.GetInt("agent.topology.docker.vpp.probeinterval") + if interval < 1 { // making probe interval 0 would be CPU intensive and negative values doesn't make sense + interval = 1 + } + return time.Duration(interval) * time.Second +} + +// updateState detects changes (up/down container, up/down VPP, VPP state changes, memif changes, VPP node connection +// changes) and updates graph according to new information. +func (p *Subprobe) updateState(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData, removedContainerGraphInfo *containerGraphInfo) { + updateMemifs := p.updateStateForAllRunningContainers(containerNames, detectedVPPs) + p.updateStateForRemovedContainers(containerNames, detectedVPPs, removedContainerGraphInfo) + if updateMemifs { + p.updateMemifs(containerNames, detectedVPPs) // update memif interfaces in existing containers if needed (these are possible cross container changes in graph->need to have all info updated first) + } +} + +// updateStateForAllRunningContainers checks VPP topology and updates graph if needed for every currently running container +func (p *Subprobe) updateStateForAllRunningContainers(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData) bool { + // retrieve actual data about VPP + dockerExecResults := p.dockerDataRetrieval(containerNames) + + // update state for all running containers based on new detected data (dockerExecResults) + updateMemifs := false + for containerName, containerInfo := range containerNames { + updateMemifs = updateMemifs || p.updateStateForRunningContainer(containerName, containerInfo, detectedVPPs, dockerExecResults) + } + return updateMemifs +} + +// updateStateForRunningContainer checks VPP topology and updates graph if needed for currently running container . +func (p *Subprobe) updateStateForRunningContainer(containerName string, containerInfo *containerInfo, detectedVPPs map[string]*vppData, + dockerExecResults map[string]map[string]string) bool { + vpp, previouslyDetected := detectedVPPs[containerName] + graphInfo := &containerInfo.containerGraphInfo + vppVersion, detectedVPP := dockerExecResults[containerName][vppCLIShowVersion] // VPP version successfully retrieved <=> dockerExecResults contains result <=> detected running VPP + + if detectedVPP && !previouslyDetected { // new VPP detected + vpp = &vppData{ + version: vppVersion, + linkedHardware: make(map[string]struct{}), + memifToSocketFile: make(map[string]string), + } + detectedVPPs[containerName] = vpp + if err := p.addVPPToGraph(vpp, graphInfo, containerName); err != nil { + logging.GetLogger().Errorf("Can't add vpp from container %v to graph (ignoring further graph additions "+ + "for this vpp) due to: %v", containerName, err) + } + p.updateHardwareLinks(vpp, containerName, graphInfo, dockerExecResults) + return p.detectMemifs(vpp, containerName, containerInfo, dockerExecResults) + } else if detectedVPP && previouslyDetected { // update previously detected VPP (metadata, connection to other nodes, etc.) + p.updateHardwareLinks(vpp, containerName, graphInfo, dockerExecResults) + return p.detectMemifs(vpp, containerName, containerInfo, dockerExecResults) + } else if !detectedVPP && previouslyDetected { // previously detected VPP went down + p.removeCachedMemifEdgesForContainer(graphInfo) + p.deleteAllMemifsForVPPFromGraph(containerName, graphInfo) + p.deleteVPPFromGraph(vpp, graphInfo) + graphInfo.vppNodeID = "" // graphInfo cleanup of old data + delete(detectedVPPs, containerName) + } + return false +} + +// updateStateForRemovedContainers updates graph and tracked state related to removed containers that could in their +// previous running state populate graph/state with VPP topology nodes/data. +func (p *Subprobe) updateStateForRemovedContainers(containerNames map[string]*containerInfo, detectedVPPs map[string]*vppData, removedContainerGraphInfo *containerGraphInfo) { + // clean already detected VPPs if they are not running in already removed container (removing container use case) + for containerName, vpp := range detectedVPPs { + if _, found := containerNames[containerName]; !found { + p.removeCachedMemifEdgesForContainer(removedContainerGraphInfo) + p.deleteAllMemifsForVPPFromGraph(containerName, removedContainerGraphInfo) + p.deleteVPPFromGraph(vpp, removedContainerGraphInfo) + delete(detectedVPPs, containerName) + } + } +} + +// updateHardwareLinks is detecting Hardware Links between VPP and OS-level interfaces(Hardware that VPP is running on) and adding/removing them to/from graph. +// Detection is done only from vpp side, because change from OS side can be +// 1. addition (also vpp must add this otherwise no link is there) or +// 2. remove (edges disappear automatically in graph) +// and in either case, the problem will resolve itself just by watching VPP side. +func (p *Subprobe) updateHardwareLinks(vpp *vppData, containerName string, graphInfo *containerGraphInfo, dockerExecResults map[string]map[string]string) { + diff, err := p.detectHardwareLinks(vpp, containerName, dockerExecResults) + if err != nil { + logging.GetLogger().Errorf("Can't detect VPP links to OS hardware interfaces in container %v due to: %v", containerName, err) + } else { + if diff { + if err := p.updateVPPHardwareLinksToGraph(vpp, graphInfo); err != nil { + logging.GetLogger().Errorf("Can't update VPP links to OS hardware interfaces in container %v due to: %v", containerName, err) + } + } + } +} + +// updateVPPHardwareLinksToGraph does update edges between VPP and Hardware nodes (interfaces like memif, veth,...) +func (p *Subprobe) updateVPPHardwareLinksToGraph(vpp *vppData, graphInfo *containerGraphInfo) error { + p.graph.Lock() + defer p.graph.Unlock() + + // initialize graph info if it was never used before + if graphInfo.linkedHardwareNodesToEdges == nil { + graphInfo.linkedHardwareNodesToEdges = make(map[graph.Identifier]graph.Identifier) + } + + // get nodes + vppNode := p.graph.GetNode(graphInfo.vppNodeID) + if vppNode == nil { + return fmt.Errorf("can't find graph node (vpp node) with id %v", graphInfo.vppNodeID) + } + nsRootNode := p.graph.GetNode(graphInfo.nsRootNodeID) + if nsRootNode == nil { + return fmt.Errorf("can't find graph node (docker container ns root node) with id %v", graphInfo.nsRootNodeID) + } + linkNodes := p.linkedHardwareNodes(nsRootNode, vpp) + + // create actual links (if they does not exist) + for linkNodeID, linkNode := range linkNodes { + if _, contains := graphInfo.linkedHardwareNodesToEdges[linkNodeID]; !contains { + graphInfo.linkedHardwareNodesToEdges[linkNodeID] = graph.GenID() + edge, err := p.graph.NewEdge(graphInfo.linkedHardwareNodesToEdges[linkNodeID], vppNode, linkNode, topology.Layer2Metadata()) + if err != nil { + return fmt.Errorf("can't create edge between hardware node (id=%v) and vpp node (id=%v) due to: %v", linkNodeID, vppNode.ID, err) + } + p.graph.AddEdge(edge) + } + } + + // removing old edges the was detected and created in previous updates + for oldLinkNodeID, oldLinkEdgeID := range graphInfo.linkedHardwareNodesToEdges { + if _, contains := linkNodes[oldLinkNodeID]; !contains { + delete(graphInfo.linkedHardwareNodesToEdges, oldLinkNodeID) + e := p.graph.GetEdge(oldLinkEdgeID) + if e == nil { + logging.GetLogger().Warningf("can't remove VPP Hardware link edge, because we can't find it in graph by id (id=%v)", oldLinkEdgeID) + continue + } + err := p.graph.DelEdge(e) + if err != nil { + logging.GetLogger().Warningf("can't remove VPP Hardware link edge(id=%v) due to: %v", oldLinkEdgeID, err) + } + } + } + + return nil +} + +// linkedHardwareNodes provides all currently existing graph nodes that represent hardware directly linked to VPP (as from VPP console "show hardware") +func (p *Subprobe) linkedHardwareNodes(nsRootNode *graph.Node, vpp *vppData) map[graph.Identifier]*graph.Node { + linkNodes := make(map[graph.Identifier]*graph.Node) + for _, rootEdge := range p.graph.GetNodeEdges(nsRootNode, topology.OwnershipMetadata()) { + directChild := p.graph.GetNode(rootEdge.Child) + if directChild == nil { + logging.GetLogger().Warningf("can't find direct child(id %v)of nsRoot node, ignoring it", rootEdge.Child) + continue + } + + if t, ok := directChild.Metadata["Type"].(string); ok && t == "veth" { + if name, ok := directChild.Metadata["Name"].(string); ok { + if _, shouldBeLink := vpp.linkedHardware[name]; shouldBeLink { + linkNodes[directChild.ID] = directChild + } + } + } + } + return linkNodes +} + +// detectHardwareLinks retrieves from VPP CLI information about used hardware links (links to physical hardware, i.e. eth0 interface) +// and returns whether these links changed in compare to previous state check. +func (p *Subprobe) detectHardwareLinks(vpp *vppData, containerName string, dockerExecResults map[string]map[string]string) (bool, error) { + vppHardware, exists := dockerExecResults[containerName][vppCLIShowHardware] + if !exists { + return false, fmt.Errorf("can't get output from %v, skipping hardware links additions", vppCLIShowHardware) + } + re, err := regexp.Compile(`\s*up\s*(\S*)\s`) //(?:up|down) + if err != nil { + return false, fmt.Errorf("can't compile regular expression for parsing vpp hardware listing output") + } + linkedHardware := make(map[string]struct{}) + for _, line := range strings.Split(vppHardware, "\n") { + if matches := re.FindStringSubmatch(line); matches != nil { + hardwareName := strings.TrimPrefix(matches[1], "host-") // removing "host" prefix for Veth links + linkedHardware[hardwareName] = struct{}{} + } + } + if !reflect.DeepEqual(vpp.linkedHardware, linkedHardware) { + vpp.linkedHardware = linkedHardware + return true, nil + } + return false, nil +} + +// addVPPToGraph adds graph node representing running VPP +func (p *Subprobe) addVPPToGraph(vpp *vppData, graphInfo *containerGraphInfo, containerName string) error { + p.graph.Lock() + defer p.graph.Unlock() + + // get container node + nsRootNode, err := p.nsRootNode(graphInfo, containerName) + if err != nil { + return fmt.Errorf("can't add vpp node to graph due to: %v", err) + } + + // add vpp node + metadata := graph.Metadata{ + "Type": "vpp", + "Manager": "docker", + "Name": "VPP", + "Container": containerName, + "version": vpp.version, + } + graphInfo.vppNodeID = graph.GenID() + vppNode, err := p.graph.NewNode(graphInfo.vppNodeID, metadata) + if err != nil { + return fmt.Errorf("can't create new VPP node in graph (container %v) due to:%v", containerName, err) + } + topology.AddOwnershipLink(p.graph, nsRootNode, vppNode, nil) + return nil +} + +// nsRootNode retrieves graph.Identifier for container namespace root node (nsRoot). It contains also failsafe in case of +// changed graph.Identifier (nsRootNode is handled by docker probe) that finds nsRoot node by metadata search. +func (p *Subprobe) nsRootNode(graphInfo *containerGraphInfo, containerName string) (*graph.Node, error) { + nsRootNode := p.graph.GetNode(graphInfo.nsRootNodeID) + if nsRootNode == nil { + // reacquiring nsRootNode based on metadata node search (reason: observed occasional change of namespace root node, so node.id is not stable) + nsRootNodes := p.graph.GetNodes(graph.Metadata{"Type": "netns", "Name": containerName}) + if len(nsRootNodes) > 0 { + nsRootNode = nsRootNodes[0] + graphInfo.nsRootNodeID = nsRootNode.ID + } else { + return nil, fmt.Errorf("can't find graph node (docker container ns root node) with id %v", graphInfo.nsRootNodeID) + } + } + return nsRootNode, nil +} + +// deleteVPPFromGraph removes graph node representing running VPP +func (p *Subprobe) deleteVPPFromGraph(vpp *vppData, graphInfo *containerGraphInfo) { + p.graph.Lock() + defer p.graph.Unlock() + + vppNode := p.graph.GetNode(graphInfo.vppNodeID) + if vppNode == nil { + logging.GetLogger().Errorf("can't find previously created vpp node with id %v", graphInfo.vppNodeID) + return + } + + if err := p.graph.DelNode(vppNode); err != nil { + logging.GetLogger().Errorf("can't delete vpp node with id %v", graphInfo.vppNodeID) + return + } +}