Pipes everywhere. Work in progress.
MVC-style JS frameworks (Backbone.js, others) encourage a proliferation of public methods and custom events. This makes apps difficult to understand and debug, pretty quickly. It also means that apps end up being imperative code all the way through - that is implementaion details all the way through.
This toolkit attempts to focus JS app design around the idea of composable pipes with a constrained API. If MVC encourages few nouns and many verbs, Plumber.js encourages many nouns and very few verbs. It also encourages composition and encapsulation, traceability and a declarative style that focuses on describing the relationship between your app's components rather than their implementation.
Inspiration for this library can be found in ZeroMQ, Node's streams interface and other JS libs such as Angular.js.
Structs hold data and emit change events when the data change. Nothing more. They are the base value objects that you pass around. You can define methods on them but you probably shouldn't. They are not models.
var mario = new Plumber.Struct({name: 'Mario'})
mario.on('change:name', function () {
console.log('Name changed!')
})
mario.set('name', 'Luigi') // triggers `change:name` event
mario.get('name') // "Luigi"
This is the core type in Plumber.js. You add and remove structs from pipes.
A pipe implements add
, remove
and pipe
. Pipes can be chained.
var p1 = new Plumber.Pipe()
var p2 = new Plumber.Pipe()
var p3 = new Plumber.Pipe()
p1.pipe(p2).pipe(p3)
p3.on('add', function (struct) {
console.log('p3 received data!', struct)
})
p1.add(mario) // forwards mario to other pipes downstream, including p2 and then p3
You can implement you own pipes with data filters.
var MarioFilter = Plumber.Pipe.extend({
filter: function (struct, filterPromise) {
if(struct.get('name') == 'Mario') filterPromise.resolve(struct)
else filterPromise.reject(struct)
}
})
var filter = new MarioFilter()
filter.pipe(p2).pipe(p3)
filter.add(new Plumber.Struct({name: 'Mario'})) // p2 and p3 get struct added
filter.add(new Plumber.Struct({name: 'Luigi'})) // filter is rejected. p2 and p3 DO NOT get struct added
Filters expect a struct and a promise (jQuery.Deferred
), so you can implement asynchronous filters (example: Ajax lookups)
var MarioFilter = Plumber.Pipe.extend({
filter: function (struct, filterPromise) {
$.get('http://some.api.com/valid_mario', {name: struct.get('name')}).then(
function () { filterPromise.resolve(struct) }, // success
function () { filterPromise.reject(struct) } // error
)
}
})
A pipe's public API of add
, remove
and pipe
is composed of methods that you can customize to get specialized behaviour. The add
lifecycle breaks down as follows:
add(struct)
filter(struct, filterPromise)
_add(struct, addPromise)
_formwardAdd(struct)
add
returns a promise, so multiple add() operations can be chained:
$.when(pipe1.add(struct), pipe2.add(struct), pipe3.add(struct)).then(...)
An index is a pipe that keeps track of structs added to it. From the outside, an index is just a pipe with add
, remove
and pipe
methods, but it also does:
- forward existing data onto newly piped pipes.
- when adding a struct already in the index, it updates its attributes and triggers change events in it instead of forwading it to other pipes.
- Wrap raw data in Structs if not already a struct.
var index = new Plumber.Index()
index.add(mario)
index.add(luigi)
index.pipe(another_pipe) // `another_pipe` will get mario and luigi added to it now
index.add(princess) // 'another_pipe' gets princess added to it.
index.add(mario) // forwards to other pipes
mario.set(age: 30)
index.add(mario) // does not forward.
index.add({name: 'Mario'}) // wraps data into Plumber.Struct before forwarding.
You can initialize an index with a custom Struct subclass to use as the data wrapper.
var index = new Plumber.Index(Person)
Plumber.Index can be subclassed as well. The following example implements a "capped index" that removes older structs over a limit of 10
var CappedIndex = Plumber.Index.extend({
limit: 10,
_add: function (item, promise) {
// remove first if limit reached
if(this._list.length > this.limit - 1) this.remove(this._list[0])
// add next
return Plumber.Index.prototype._add.call(this, item, promise)
}
})
In fact the last example is part of the built-in "devices" described below.
Devices are pipes that wrap other pipes and posibly implement routing strategies. You can build your own devices by reimplementing the internals of add
, remove
or pipe
.
Plumber.js ships with the following basic devices:
A "capped index" can hold a defined number of structs at a time. Older structs are removed as new ones are added. This is useful for building rolling views, infinite scrolling, real time charts and others where you only want to keep a limited number of data points in memory or view.
var index = new Plumber.Devices.CappedIndex(10) // holds up to 10 structs.
index.pipe(someViewPipe)
for(var i = 0; i < 100; i++) {
index.add(new Struct({name: i}))
}
The previous example will start remove
ing from the beginning of the list after struct number 10. Calls to remove
will be forwarded to any piped pipes.
A choke point adds to a set of pipes and waits for all of them to complete before forwarding to other pipes.
var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var choke = new Plumber.Devices.ChokePoint(p1, p2)
choke.pipe(results)
choke.add(struct) // won't pipe to `results` until both p1 and p2 have finished adding., which may be asynchronous.
Wraps multiple pipes and chains them using their pipe
method.
This pipeline is "leaky" because data added to pipes in the chain will be forwarded on to the other pipes in the chain.
var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var pipeline = new Plumber.Devices.LeakyPipeline(p1, p2)
pipeline.add(struct) // will forward struct to p1, p2 and finally pipe on to results pipe
p2.add(struct) // will also forward down to results pipe
Wraps multiple pipes, chain their #add and #remove calls sequentially while allowing for async execution.
This pipeline is "tight" because it can only be initiated by calling add
on the pipeline itself.
var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var pipeline = new Plumber.Devices.TightPipeline(p1, p2)
pipeline.add(struct) // will forward struct to p1, p2 and finally pipe on to results pipe
Two-way ventilator that can be use for fan-in and fan-out behaviour.
- It will pipe any messages added to pipes in the IN array to all pipes in the OUT array.
- Messages added to pipes in the IN array will also be piped normally through Ventilator#pipe()
var in1 = new Plumber.Pipe()
var in2 = new Plumber.Pipe()
var results = new Plumber.Pipe()
var ventilator = new Plumber.Devices.Ventilator([in1, in2])
ventilator.pipe(results)
in1.add(struct) // forwards struct to out1 and out2
var ventilator = new Plumber.Devices.Ventilator(null, [out1, out2])
ventilator.add(struct) // forwards struct to out1 and out2
var ventilator = new Plumber.Devices.Ventilator([in1, in2], [out1, out2])
in1.add(struct) // forwards struct to out1 and out2
in2.add(struct) // forwards struct to out1 and out2
Declare routing functions and pipe instances to route to.
var p1 = new Plumber.Pipe()
var p2 = new lumber.Pipe()
var default = new Plumber.Pipe()
var router = new Plumber.Devices.Router()
router
.route(p1, function (struct, promise) {
if(struct.get('name') == 'Joe') promise.resolve()
else promise.reject()
})
.route(p2, function (struct, promise) {
if(struct.get('name') == 'Jane') promise.resolve()
else promise.reject()
})
.default(default)
router.add(new Plumber.Struct({name: 'Joe'})) // forwards struct to p1
router.add(new Plumber.Struct({name: 'Jane'})) // forwards struct to p2
router.add(new Plumber.Struct({name: 'Paul'})) // forwards struct to default
router.pipe(other) // pipes all data to `other`.
Timed aggregator (or "reducer") that buffers structs as per user-provided delay and runs an aggregate function. After delay has passed it forwards aggregated struct to pipes downstream. Useful for building time-interval charts and widgets, aggregated logs, etc.
Examples:
// Define your custom aggregator
var Counter = Plumber.Devices.TimedAggregator.extend({
startValues: {
count: 0
},
aggregate: function (mem, struct) {
mem.set('count', mem.get('count') + 1)
}
})
// Use it.
var aggregator = new Counter(200)
var results = new Plumber.Pipe()
aggregator.pipe(results)
aggregator.add(new Plumber.Struct({count: 2})) // aggregates into "mem" struct.
aggregator.add(new Plumber.Struct({count: 5}))
// ... After 200 milliseconds, a "mem" struct is piped to results with a `count` value of 7
Implementing your own device or piping strategies by overriding any of the following hooks:
var CustomDevice = Plumber.Pipe.extend({
filter: function (struct, filterPromise) {...},
_add: function (struct, addPromise) {...},
_remove: function (struct, removePromise) {...},
_forwardAdd: function (struct) {...},
_forwardRemove: function (struct) {...}
})
A "repository" is used to initiate a data stream or data request. A repository is a pipe so it can be piped to other pipes, wrapped, decorated, etc.
An example Ajax repository:
// users.pipe(results).get('http://some.api.com/users')
var Users = Plumber.Pipe.extend({
// Get from remote API
get: function (url) {
var self = this
$.getJSON(url).then(function (data) {
data.forEach(function (user) {
self.add(user)
})
})
return this
}
})
A more advanced repository that keeps its own internal index
var Users = Plumber.Pipe.extend({
// A repository can have it's own index
initialize: function () {
this.__index = new Plumber.Index(UserStruct)
// pipe index to itself so repository can pipe to other pipes transparently
this.__index.pipe(this)
},
// Get from remote API
get: function (url) {
var self = this
$.getJSON(url).then(function (data) {
data.forEach(function (user) {
self.__index.add(user)
})
})
return this
}
})
As any other pipe, repositories can be composed into devices or piped to and from other pipes.
The following example implements saving data to a server by customising the _add
hook.
var Users = Plumber.Pipe.extend({
_add: function (struct, addPromise) {
// send data to server
$.ajax(this.url, {
type: 'post',
dataType: 'json',
data: struct.attributes
}).then(function (data) {
struct.set(data) // update struct with data coming from server
addPromise.resolve(struct)
})
}
})
This library uses my jbundle command line for help with bundling and minifying source files.
Tests are in the test directory.
Run the test server to serve bundled source files
$ jbundle s
Run Jasmine test suite by opening test/index.html
in your browser.
Source files are in the src directory.
Bundle manifest is in the JFile file.