Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runproc sample #21

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ func (c *Client) Get() interface{} {
}

// Scrub is an Anchor interface method, not applicable to the root-level anchor.
func (c *Client) Scrub() {}
func (c *Client) Scrub() {}
1 change: 1 addition & 0 deletions client/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Cmd struct {
// If Scrub is set, the process element will automatically be removed from its anchor
// when the process exits.
Scrub bool

}

func retypeProcStat(c proc.Cmd) Cmd {
Expand Down
2 changes: 1 addition & 1 deletion cmd/circuit/glob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ import (
func TestGlob(t *testing.T) {
walk, ellipses := parseGlob("/X/hola/petar/...")
log.Printf("w=%v ell=%v", walk, ellipses)
}
}
11 changes: 9 additions & 2 deletions cmd/circuit/hmac.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ import (
"github.com/gocircuit/circuit/github.com/codegangsta/cli"
)

func keygen(c *cli.Context) {
func keygen(c *cli.Context) string {
rand.Seed(time.Now().UnixNano())
seed := make([]byte, 32)
for i, _ := range seed {
seed[i] = byte(rand.Int31())
}
key := sha512.Sum512(seed)
text := base64.StdEncoding.EncodeToString(key[:])
fmt.Println(text)

return text
}

func keygenPrint(c *cli.Context) {

fmt.Println(keygen(c))

}
14 changes: 13 additions & 1 deletion cmd/circuit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
{
Name: "keygen",
Usage: "Generate a new random HMAC key",
Action: keygen,
Action: keygenPrint,
},
{
Name: "ls",
Expand Down Expand Up @@ -219,6 +219,18 @@ func main() {
cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."},
},
},
{
Name: "runproc",
Usage: "Run a process element and return output on stdout",
Action: runproc,
Flags: []cli.Flag{
cli.StringFlag{"dial, d", "", "circuit member to dial into"},
cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"},
cli.BoolFlag{"tag", "tag each output line with the anchor names (hostnames)"},
cli.BoolFlag{"all", "run the command across all hosts"},
cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."},
},
},
{
Name: "signal",
Usage: "Send a signal to a running process",
Expand Down
113 changes: 112 additions & 1 deletion cmd/circuit/procdkr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/gocircuit/circuit/client"
"github.com/gocircuit/circuit/client/docker"
"github.com/gocircuit/circuit/kit/iomisc"

"github.com/gocircuit/circuit/github.com/codegangsta/cli"
)
Expand Down Expand Up @@ -53,6 +56,112 @@ func mkproc(x *cli.Context) {
}
}

func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string, done chan bool) {

w2, _ := parseGlob(path)
a2 := c.Walk(w2)
_runproc(x, c, a2, cmd, done)

}

func runproc(x *cli.Context) {
defer func() {
if r := recover(); r != nil {
fatalf("error, likely due to missing server or misspelled anchor: %v", r)
}
}()
c := dial(x)
args := x.Args()

if len(args) != 1 && !x.Bool("all") {
fatalf("runproc needs an anchor argument or use the --all flag to to execute on every host in the circuit")
}
buf, _ := ioutil.ReadAll(os.Stdin)
var cmd client.Cmd
if err := json.Unmarshal(buf, &cmd); err != nil {
fatalf("command json not parsing: %v", err)
}
cmd.Scrub = true

el := "/runproc/" + keygen(x)

done := make(chan bool, 10)
if x.Bool("all") {

w, _ := parseGlob("/")

anchor := c.Walk(w)

procs := 0

for _, a := range anchor.View() {

procs++

go func(x *cli.Context, cmd client.Cmd, a string, done chan bool) {

doRun(x, c, cmd, a, done)

}(x, cmd, a.Path()+el, done)

}

for ; procs > 0 ; procs-- {

select {
case <-done:
continue
}

}

} else {

doRun(x, c, cmd, args[0]+el, done)
<-done

}

}

func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd, done chan bool) {

p, err := a.MakeProc(cmd)
if err != nil {
fatalf("mkproc error: %s", err)
}

stdin := p.Stdin()
if err := stdin.Close(); err != nil {
fatalf("error closing stdin: %v", err)
}

if x.Bool("tag") {

stdout := iomisc.PrefixReader(a.Addr() + " ", p.Stdout())
stderr := iomisc.PrefixReader(a.Addr() + " ", p.Stderr())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use PrefixReader or PrefixWriter from package kit/iomisc instead.

stdoutScanner := bufio.NewScanner(stdout)
for stdoutScanner.Scan() {
fmt.Println(stdoutScanner.Text())
}

stderrScanner := bufio.NewScanner(stderr)
for stderrScanner.Scan() {
fmt.Println(stderrScanner.Text())
}

} else {

io.Copy(os.Stdout, p.Stdout())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about stderr?

io.Copy(os.Stderr, p.Stderr())

}
p.Wait()
done <- true

}

func mkdkr(x *cli.Context) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -92,7 +201,9 @@ func sgnl(x *cli.Context) {
fatalf("signal needs an anchor and a signal name arguments")
}
w, _ := parseGlob(args[1])
u, ok := c.Walk(w).Get().(interface{Signal(string) error})
u, ok := c.Walk(w).Get().(interface {
Signal(string) error
})
if !ok {
fatalf("anchor is not a process or a docker container")
}
Expand Down