Skip to content

Commit

Permalink
feat(kernel): add support for concurrency with ants pool (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 11, 2024
1 parent c1bf0db commit 41531e0
Show file tree
Hide file tree
Showing 32 changed files with 701 additions and 425 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"leaktest",
"linecomment",
"linters",
"lorax",
"mattn",
"nakedret",
"nolint",
Expand Down Expand Up @@ -77,6 +78,7 @@
"varcheck",
"watchv",
"watchvc",
"watchvi"
"watchvi",
"Wrapf"
]
}
13 changes: 7 additions & 6 deletions builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ type buildArtefacts struct {
}

type Builders struct {
ob optionsBuilder
nb navigatorBuilder
pb pluginsBuilder
options optionsBuilder
navigator navigatorBuilder
plugins pluginsBuilder
extent extent
}

func (bs *Builders) buildAll() (*buildArtefacts, error) {
o, optionsErr := bs.ob.build()
o, optionsErr := bs.options.build()
if optionsErr != nil {
had, _ := kernel.HadesNav(optionsErr)

Expand All @@ -32,7 +33,7 @@ func (bs *Builders) buildAll() (*buildArtefacts, error) {
}, optionsErr
}

nav, navErr := bs.nb.build(o)
nav, navErr := bs.navigator.build(o)
if navErr != nil {
had, _ := kernel.HadesNav(navErr)

Expand All @@ -42,7 +43,7 @@ func (bs *Builders) buildAll() (*buildArtefacts, error) {
}, navErr
}

plugins, pluginsErr := bs.pb.build(o)
plugins, pluginsErr := bs.plugins.build(o)
if pluginsErr != nil {
had, _ := kernel.HadesNav(pluginsErr)

Expand Down
57 changes: 57 additions & 0 deletions concurrent-defs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package tv

import (
"github.com/snivilised/lorax/boost"
"github.com/snivilised/traverse/core"
)

type (
// TraverseInput represents the type of inputs accepted by the worker pool
TraverseInput struct {
// Node represents the file system entity (file or folder) for which
// a job will execute.
Node *core.Node

// Handler is the client defined callback function that should be
// invoked for all eligible Nodes.
Handler core.Client
}

// TraverseJobStream represents the core channel type of the worker pool's
// input stream. The client owns this channel and is responsible for
// closing it when done or invoking Conclude directly on the pool (See
// boost for more details).
TraverseJobStream = boost.JobStream[TraverseInput]

// TraverseJobStreamR worker pool's read stream, pool reads from this channel.
TraverseJobStreamR = boost.JobStreamR[TraverseInput]

// TraverseJobStreamW worker pool's write stream, client writes to this channel.
TraverseJobStreamW = boost.JobStreamW[TraverseInput]

// TraverseOutput represents the output of a single job executed by the pool.
TraverseOutput struct {
// Node represents the file system entity (file or folder) from
// which this output was generated via the client defined handler.
Node *core.Node

// Error error result of client's handler.
Error error

// Data is a custom field reserved for the client
Data any
}

// TraverseOutputStream represents the core channel type of the worker pool's
// output stream. The pool owns this stream and will be closed only when
// safe to do so, which will be anytime after navigation is complete.
// The channel is only closed when there are no remaining outstanding jobs
// and all workers are idle.
TraverseOutputStream = boost.JobOutputStream[TraverseOutput]

// TraverseOutputStreamR worker pool's output stream read by the client.
TraverseOutputStreamR = boost.JobOutputStreamR[TraverseOutput]

// TraverseOutputStreamW worker pool's output stream written to by the pool.
TraverseOutputStreamW = boost.JobOutputStreamW[TraverseOutput]
)
10 changes: 7 additions & 3 deletions core/navigator.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package core

import (
"context"
)

type Navigator interface {
Navigate() (TraverseResult, error)
Navigate(ctx context.Context) (TraverseResult, error)
}

type NavigatorFunc func() (TraverseResult, error)
type Navigate func() (TraverseResult, error)

func (fn NavigatorFunc) Navigate() (TraverseResult, error) {
func (fn Navigate) Navigate() (TraverseResult, error) {
return fn()
}
82 changes: 77 additions & 5 deletions director-error_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package tv_test

import (
"context"
"fmt"
"sync"

"github.com/fortytw2/leaktest"
. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
. "github.com/onsi/gomega" //nolint:revive // ok

tv "github.com/snivilised/traverse"
"github.com/snivilised/traverse/internal/services"
"github.com/snivilised/traverse/pref"
)

type traverseErrorTE struct {
given string
using *tv.Using
as *tv.Was
was *tv.Was
}

var _ = Describe("director error", Ordered, func() {
Expand All @@ -37,8 +41,8 @@ var _ = Describe("director error", Ordered, func() {
return
}

if entry.as != nil {
Expect(entry.as.Validate()).NotTo(Succeed())
if entry.was != nil {
Expect(entry.was.Validate()).NotTo(Succeed())

return
}
Expand Down Expand Up @@ -73,7 +77,7 @@ var _ = Describe("director error", Ordered, func() {

Entry(nil, &traverseErrorTE{
given: "as missing restore from path",
as: &tv.Was{
was: &tv.Was{
Using: tv.Using{
Root: "/root-traverse-path",
Subscription: tv.SubscribeFiles,
Expand All @@ -85,7 +89,7 @@ var _ = Describe("director error", Ordered, func() {

Entry(nil, &traverseErrorTE{
given: "as missing resume strategy",
as: &tv.Was{
was: &tv.Was{
Using: tv.Using{
Root: "/root-traverse-path",
Subscription: tv.SubscribeFiles,
Expand All @@ -95,4 +99,72 @@ var _ = Describe("director error", Ordered, func() {
},
}),
)

When("Prime with subscription error", func() {
It("🧪 should: fail", func(specCtx SpecContext) {
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(specCtx)
defer cancel()

_, err := tv.Walk().Configure().Extent(tv.Prime(
tv.Using{
Root: RootPath,
Handler: func(_ *tv.Node) error {
return nil
},
},
)).Navigate(ctx)

Expect(err).NotTo(Succeed())
})
})

When("Prime with options build error", func() {
It("🧪 should: fail", func(specCtx SpecContext) {
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(specCtx)
defer cancel()

_, err := tv.Walk().Configure().Extent(tv.Prime(
tv.Using{
Root: RootPath,
Subscription: tv.SubscribeFiles,
Handler: func(_ *tv.Node) error {
return nil
},
},
tv.WithSubscription(tv.SubscribeFiles),
func(_ *pref.Options) error {
return errBuildOptions
},
)).Navigate(ctx)

Expect(err).To(MatchError(errBuildOptions))
})
})

When("Prime with subscription error", func() {
It("🧪 should: fail", func(specCtx SpecContext) {
defer leaktest.Check(GinkgoT())()

ctx, cancel := context.WithCancel(specCtx)
defer cancel()

var wg sync.WaitGroup

_, err := tv.Run(&wg).Configure().Extent(tv.Prime(
tv.Using{
Root: RootPath,
Handler: func(_ *tv.Node) error {
return nil
},
},
)).Navigate(ctx)

wg.Wait()
Expect(err).NotTo(Succeed())
})
})
})
Loading

0 comments on commit 41531e0

Please sign in to comment.