Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline #7

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open

Pipeline #7

wants to merge 27 commits into from

Conversation

jgosmann
Copy link
Collaborator

@jgosmann jgosmann commented Mar 1, 2016

This PR supersedes PR #6. Instead of using the actions from that PR one can now define processing steps and connect them into processing pipelines. This allows for more flexibility as steps can process multiple items and output multiple items (and this is not required to be a one to one mapping). Also it makes it more explicit how things are connected up (instead of the implicit dependencies) which I like more. (There a still a few things which aren't perfect, but it might be as good as it gets, as it's a complicated problem.) The API for writing Nengo benchmarks is still almost the same.

Processing steps

A processing step is defined with a class deriving from procstep.Step and has a process function that returns a generator. For example we could generate a list of numbers:

from ctn_benchmark.procstep import Step

class Producer(Step):
    def process(self):
        yield 0
        yield 1
        yield 2

Instances of Step can be iterated over:

for i in Producer():
    print i
# prints
#0
#1
#2

There is also a function to gather all results into a list:

results = Producer().process_all()  # assigns [0, 1, 2] to results

Of course, we can chain two steps. For that we have to define class attributes on a step as inputs that other steps can connect to.

from ctn_benchmark.procstep import Connector

class Square(Step):
    numbers = Connector('numbers')

    def process(self):
        for i in self.numbers:
            yield i * i

squaring_pipeline = Square(numbers=Producer())
print(squaring_pipeline.process_all())  # [0, 1, 4]

In the process function we can iterate over the inputs (which will be lazily evaluated because they are basically generators). When instantiating a step we can assign the inputs as keyword arguments.

The same step instance step can be used as input to multiple other steps. In that case iterating over step will cache all results so that they won't have to be recomputed (but this might take more memory). Note that process_all will not do this caching (maybe it should, but I am not sure, usually process_all should only be called on the endpoints of a pipeline which does not have any outgoing connections).

Mapped steps

It is common that the same operation should be performed for every input item (or set of input items). For this the class MappedStep exists which will call a function process_item for each set of input items. Note that process_items returns a value instead of a generator or yielding items. Here is an example doing the multiplication of two numbers:

from ctn_benchmark.procstep import MappedStep

class Multiply(MappedStep):
    a = Connector('a')
    b = Connector('b')

    def process_item(self, a, b, **kwargs):
        return a * b

results = Multiply(a=Producer(), b=Producer()).process_all()  # [0, 1, 4]

Parametrized Step

The ParametrizedMixin class can be added to Step classes. It allows to define allowed parameters in the params function and in the process function, these can be accessed as self.p. This includes only the parameters of that single step. self.ap allows to access all parameters of preceding steps and the current step.

from ctn_benchmark.procstep import ParametrizedMixin

class Scale(MappedStep, ParametrizedMixin):
    number = Connector('number')

    def process_item(self, number, **kwargs):
        return number * self.p.scale

    def params(self):
        self.p.add_default("scale", scale=2.)

results = Scale(number=Producer()).process_all()  # [0, 2, 4]

There are two helper functions all_params and set_params to get all valid parameters on a pipeline created from processing steps and to set those parameters.

Creating steps from functions

Sometimes it is more convenient to create a processing step from a function than to create a new class. This can be done with FunctionMappedStep and ParametrizedFunctionMappedStep. The function arguments will be used as input connectors. Note that ParametrizedFunctionMappedStep adds a positional argument with the parameter set (as self.p cannot be used in a pure function).

Common steps

common_steps.py defines a number of more or less general processing steps.

Creating the PR to save what I typed so far, will continue writing later.

Pipelines

While I use pipeline as term for chained processing steps, pipeline.py defines actual Pipeline class. These put together complete pipelines and basically define entry points (called "actions") which can easily be invoked (also from the command line). communication2.py shows how NengoPipeline can be used. The API is pretty much as before. The biggest change is that recording of the speed is now done with a context manager.

I will soon post an example how I use pipelines with my TCM-math model which actually needs some more of the flexibility provided by this architecture.

Todo

  • more unit tests
  • better documentation
  • Improve command line help.

@jgosmann
Copy link
Collaborator Author

jgosmann commented Mar 2, 2016

Here the promised example with my TCM model which runs the same model with three different experimental protocols, plots the results into three figures with human data, and saves or shows them all.

class TcmMathAccum2(pipeline.EvaluationAndPlottingPipeline):
    class Producer(procstep.Step, procstep.ParametrizedMixin):
        def process(self):
            yield evaluation.Evaluation(
                "HowaKaha99 Immed",
                experiments.HowaKaha99('../data/HowaKaha99/Immed.dat'),
                protocols.FreeRecall(self.p.n_items, pi=1., ipi=0., ri=0.))
            yield evaluation.Evaluation(
                "HowaKaha99 Delayed",
                experiments.HowaKaha99('../data/HowaKaha99/Ltr0.dat'),
                protocols.FreeRecall(self.p.n_items, pi=1.2, ipi=0., ri=16.))
            yield evaluation.Evaluation(
                "HowaKaha99 contdist",
                experiments.HowaKaha99('../data/HowaKaha99/Ltr3.dat'),
                protocols.FreeRecall(self.p.n_items, pi=1.2, ipi=16., ri=16.))

        def params(self):
            self.p.add_default("List length to remember", n_items=12)

    def run_trial(self, p, ev):
        vocab = spa.Vocabulary(p.item_d)
        proto = ev.protocol
        model = TCM(p.item_d, p.context_d, p.beta, p.gamma)

        for x in proto.stimuli(distractor_rate=p.distractor_rate):
            model.present_item(vocab.parse(x).v)
            model.update_matrices(vocab.parse(x).v)

        decision_proc = AccumulatorDecisionProcess(
            model, vocab.create_subset(list(proto.recall_stimuli())),
            a_min=p.accum.a_min, threshold=p.accum.threshold, tau=p.accum.tau,
            kappa=p.accum.kappa, lambda_=p.accum.lambda_, sigma=p.accum.sigma,
            rng=np.random)

        recalled = []
        for i in range(ev.protocol.n_items):
            x = decision_proc.decide(None)
            recalled.append(x)
            if p.update_recall_context:
                model.update_context(
                    model.retrieve_context(vocab['V' + str(x)].v))

        return recalled

    def evaluate(self, p, ev, **kwargs):
        data = []
        with ProgressTracker(p.n_trials, p.progress) as progress:
            for trial in range(p.n_trials):
                data.append(self.run_trial(p, ev))
                progress.step()

        data = np.array(data)
        kw = {i: data[:, i] for i in range(ev.protocol.n_items)}
        kw.update({'trial': range(p.n_trials)})
        return kw

    def evaluate_params(self, ps):
        ps.add_default("Show progress bar.", progress=True)
        ps.add_default("Number of trials to average over.", n_trials=3000)
        ps.add_default("item dimensionality", item_d=256)
        ps.add_default("context dimensionality", context_d=256)
        ps.add_default("contextual drift rate", beta=0.4)
        ps.add_default("M_FT update ratio", gamma=1.)
        ps.add_default("distractor rate", distractor_rate=0.4)
        ps.add_default(
            "Update context during recall.", update_recall_context=True)

        ps.add_default("Accumulator parameters", accum=ParameterSet())
        ps.accum.add_default(
            "Minimum item activation for inclusion in C_V "
            "scaling.", a_min=.0001)
        ps.accum.add_default("Recall threshold", threshold=1.)
        ps.accum.add_default("Rate of growth at each time step.", tau=.5)
        ps.accum.add_default("Strength of recurrent inhibition.", kappa=.62)
        ps.accum.add_default("Strength of lateral inhibition.", lambda_=.62)
        ps.accum.add_default(
            "Standard deviation of accumulator noise.", sigma=.029934)

    def create_evaluate_step(self):
        self.produce_step = self.Producer()
        return procstep.ParametrizedFunctionMappedStep(
            self.evaluate, self.evaluate_params, ev=self.produce_step)

    def plot(self, p, data, ev, **kwargs):
        df = pd.DataFrame(data)
        df = pd.melt(
            df, id_vars=['trial'], var_name='pos',
            value_name='recalled_pos').set_index(['trial', 'pos'])

        fig = plt.figure()
        fig.suptitle(ev.name)
        for i, (ana, kwargs) in enumerate(ev.exp.analyses):
            ax = fig.add_subplot(1, len(ev.exp.analyses), i + 1)
            ana(ev.exp.data, **kwargs).plot(ax=ax)
            ana(df, **kwargs).plot(ax=ax)
            # ax.set_title(ana.__name__)
        return [fig]

    def create_plot_step(self, **kwargs):
        return procstep.ParametrizedFunctionMappedStep(
            self.plot, self.plot_params, ev=self.produce_step,
            data=self.get_data_step())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant