/* vim:set ts=2 sw=2 sts=2 expandtab */
/*jshint asi: true undef: true es5: true node: true devel: true esnext: true
forin: false latedef: false */
/*global define: true */
!(typeof(define) !== "function" ? function($){ $(typeof(require) !== 'function' ? (function() { throw Error('require unsupported'); }) : require, typeof(exports) === 'undefined' ? this : exports); } : define)(function(require, exports) {
'use strict';
var unbind = Function.call.bind(Function.bind, Function.call)
// Convenience shortcut for Array.prototype.slice.call(args, n)
var slice = unbind(Array.prototype.slice)
// Convenience shortcut for Array.prototype.concat.apply([], [ [1], [2] ])
var unzip = Function.apply.bind(Array.prototype.concat, Array.prototype)
function reducer(f) {
/**
Takes `f` function and returns wrapped one instead that reduces arguments
to it via `f`. For example `reducer(f)(a, b, c, d)` is equivalent of
`f(a, f(b, f(c, d)))`.
**/
return function reduced(first) { return slice(arguments, 1).reduce(f, first) }
}
function pack(f) {
/**
Takes `f` function and returns a wrapper function that will pass all the
arguments starting from `f.length - 1` via array under last argument.
## Examples
var sum = pack(function(x, rest) {
return rest.reduce(function(a, b) { return a + b }, x)
})
**/
var arity = f.length - 1
return function packed() {
var args = slice(arguments, 0, arity)
args[arity] = slice(arguments, arity)
return f.apply(this, args)
}
}
exports.utils = { reducer: reducer, pack: pack }
function resolution(value) {
/**
Returns non-standard compliant (`then` does not returns a promise) promise
that resolves to a given `value`. Used just internally only.
**/
return { then: function then(resolve) { resolve(value) } }
}
function rejection(reason) {
/**
Returns non-standard compliant promise (`then` does not returns a promise)
that rejects with a given `reason`. This is used internally only.
**/
return { then: function then(resolve, reject) { reject(reason) } }
}
function attempt(f) {
/**
Returns wrapper function that delegates to `f`. If `f` throws then captures
error and returns promise that rejects with a thrown error. Otherwise returns
return value. (Internal utility)
**/
return function attempt(options) {
try { return f(options) }
catch(error) { return rejection(error) }
}
}
function isPromise(value) {
/**
Returns true if given `value` is promise. Value is assumed to be promise if
it implements `then` method.
**/
return value && typeof(value.then) === 'function'
}
function defer(prototype) {
/**
Returns object containing following properties:
- `promise` Eventual value representation implementing CommonJS [Promises/A]
(http://wiki.commonjs.org/wiki/Promises/A) API.
- `resolve` Single shot function that resolves returned `promise` with a given
`value` argument.
- `reject` Single shot function that rejects returned `promise` with a given
`reason` argument.
Given `prototype` argument is used as a prototype of the returned `promise`
allowing one to implement additional API. If prototype is not passed then
it falls back to `Object.prototype`.
## Examples
// Simple usage.
var deferred = defer()
deferred.promise.then(console.log, console.error)
deferred.resolve(value)
// Advanced usage
var prototype = {
get: function get(name) {
return this.then(function(value) {
return value[name];
})
}
}
var foo = defer(prototype)
deferred.promise.get('name').then(console.log)
deferred.resolve({ name: 'Foo' })
//=> 'Foo'
*/
var pending = [], result
prototype = (prototype || prototype === null) ? prototype : Object.prototype
// Create an object implementing promise API.
var promise = Object.create(prototype, {
then: { value: function then(resolve, reject) {
// create a new deferred using a same `prototype`.
var deferred = defer(prototype)
// If `resolve / reject` callbacks are not provided.
resolve = resolve ? attempt(resolve) : resolution
reject = reject ? attempt(reject) : rejection
// Create a listeners for a enclosed promise resolution / rejection that
// delegate to an actual callbacks and resolve / reject returned promise.
function resolved(value) { deferred.resolve(resolve(value)) }
function rejected(reason) { deferred.resolve(reject(reason)) }
// If promise is pending register listeners. Otherwise forward them to
// resulting resolution.
if (pending) pending.push([ resolved, rejected ])
else result.then(resolved, rejected)
return deferred.promise
}}
})
var deferred = {
promise: promise,
resolve: function resolve(value) {
/**
Resolves associated `promise` to a given `value`, unless it's already
resolved or rejected.
**/
if (pending) {
// store resolution `value` as a promise (`value` itself may be a
// promise), so that all subsequent listeners can be forwarded to it,
// which either resolves immediately or forwards if `value` is
// a promise.
result = isPromise(value) ? value : resolution(value)
// forward all pending observers.
while (pending.length) result.then.apply(result, pending.shift())
// mark promise as resolved.
pending = null
}
},
reject: function reject(reason) {
/**
Rejects associated `promise` with a given `reason`, unless it's already
resolved / rejected.
**/
deferred.resolve(rejection(reason))
}
}
return deferred
}
exports.defer = defer
function promise(value, prototype) {
/**
Returns a promise resolved to a given `value`. Optionally second `prototype`
arguments my be provided to be used as a prototype for a returned promise.
**/
var deferred = defer(prototype)
deferred.resolve(value)
return deferred.promise
}
exports.promise = promise
function future(f, options, prototype) {
/**
Returned a promise that immediately resolves to `f(options)` or
rejects on exception. If third argument optional `prototype` is
provided it will be used as prototype for a return promise.
**/
return promise(options, prototype).then(f)
}
exports.future = future
function lazy(f, options, prototype) {
/**
This is just like future with a difference that it will call `f` on demand
deferring this until (if ever) `then` of the returned promise is called.
**/
var result
prototype = (prototype || prototype === null) ? prototype : Object.prototype
return Object.create(prototype, {
then: { value: function then(resolve, reject) {
result = result || future(f, options)
return result.then(resolve, reject)
}}
})
}
exports.lazy = lazy
exports.run = run
function run(task) {
/**
(fabjs) like API for chaining stream operations into more readable forms.
## Examples
!((run)
(Stream.of, 1, 2, 3, 4)
(filter, function(x) { return x % 2 })
(map, function(x) { return x * x })
(print)) //
**/
var result, index, then, fab
fab = pack(function runner(task, params) {
index = run.index in task ? task[run.index] : params.length
if (then) params.splice(index, 0, { then: then })
result = task.apply(null, params)
fab.then = then = result && result.then
return fab
})
return fab.apply(null, arguments)
}
run.on = function runon(stream) {
/**
API similar to `run` with a difference that first argument passed to it will
be used as a target to which all following operations will be performed.
## Examples
!((run.on)
(Stream.of(1, 2, 3, 4))
(filter, function(x) { return x % 2 })
(map, function(x) { return x * x })
(print)) //
**/
return run(function task() { return stream })
}
run.index = ':this-index'
exports.Stream = Stream
function Stream(head, tail) {
/**
Returns stream that has given `head` and `tail`. If `tail` is not a stream
then it's assumed to be a function that returns `tail` stream once called.
## examples
var one2four = Stream(1, Stream(2, Stream(3, Stream(4))))
one2four.print() //
// Lazy
var ones = Stream(1, function() { return this })
print(take(5, ones)) //
**/
tail = tail || Stream.empty
var stream = { head: head }
stream.tail = isPromise(tail) ? tail : lazy(tail, stream)
return promise(stream)
}
Stream.error = function error(reason) {
/**
Returns a stream that will error with a given `reason`.
## Examples
var boom = Stream.error('Boom!')
print(append(Stream.of(1, 2, 3), boom)) //
**/
var deferred = defer()
deferred.reject(reason)
return deferred.promise
}
/**
Empty stream. Empty stream resolves to `null`.
**/
Stream.empty = promise(null)
exports.repeat = repeat
function repeat(value) {
/**
Returns an infinite stream of a given `value`.
## Examples
var ones = Stream.repeat(1)
print(take(5, ones)) //
print(take(11, ones)) //
**/
return Stream(value, function rest(stream) { return stream })
}
exports.iterate = iterate
function iterate(f, value) {
/**
Returns an infinite stream of `value, fn(value), fn(fn(value)), ....`.
(`fn` must be free of side-effects).
## Examples
var numbers = Stream.iterate(function(n) { return n + 1 }, 0)
print(take(5, numbers) //
print(take(15, numbers)) //
**/
return Stream(value, function rest(stream) {
return iterate(f, f(stream.head))
})
}
Stream.from = function from(value) {
/**
Creates stream from the given array, string or arguments object.
## Examples
print(Stream.from([ 1, 2, 3, 4 ])) //
print(Stream.from('hello')) //
**/
return !value.length ? Stream.empty : Stream(value[0], function rest() {
return Stream.from(slice(value, 1))
})
}
Stream.of = function of() {
/**
Returns stream of given arguments.
## Examples
Stream.of('a', 2, {}) //
**/
return Stream.from(arguments)
}
exports.capture = exports['catch'] = capture
function capture(f, stream) {
/**
Returns new stream created from the given `stream` by lazily handling it's
each item until an error occurs, in which case it's passed to given `f`
handler that is expected to return a substitution stream containing items
from that point on or `null` to stop a stream.
## Examples
var source = capture(function(error) {
// Swap error with -1
return Stream.of(-1)
}, append(Stream.of(1, 2, 3, 4), Stream.error('Boom!')))
print(source) //
**/
return lazy(function() {
return stream.then(function(stream) {
return stream && Stream(stream.head, capture(f, stream.tail))
}, f)
})
}
exports.finalize = finalize
function finalize(f, stream) {
/**
Returns new stream that contains all items at of the given `stream` followed
by all items of a stream returned by a `f` function if it returns anything.
If given `stream` has an error, it's substituted with an items of a stream
retuned by the `f` and followed by the same error. This makes it a perfect
fit for a cleanup tasks without capturing original errors.
**/
return append(capture(function(error) {
return append(future(f), Stream.error(error))
}, stream), lazy(f))
}
exports.alter = alter
function alter(f, stream) {
/**
Returns new stream created from the given `stream` by lazily applying given
`f` to each element resolution (`[head, tail]` pair or `null` in the end).
Each element resolution (including `null` identifying an end) is passed to
`f` function that **must** return substitution. Which is either stream or
`null` (identifying end). Please note that even though `alter` returns result
immediately, `stream` is still altered on demand.
## Examples
function power(n, stream) {
return alter(function(stream) {
console.log('!')
// If not an end substitute head and tail with power of `n`. Otherwise
// return an end.
return stream && Stream(Math.pow(stream.head, n), power(n, stream.tail))
}, stream)
}
var powered = power(2, Stream.of(1, 2, 3, 4))
// Notice that only one `!` logged. That's because one element is processed.
print(take(1, powered)) // !
function append(a, b) {
return alter(function(stream) {
// If not an end then append `b` to a tail, otherwise substitute `null`
// with `b`.
return stream ? Stream(stream.head, append(stream.tail, b)) : b
}, a)
}
var ab = append(Stream.of(1, 2, 3), Stream.of(4, 5, 6, 7))
print(ab) //
**/
return lazy(function() { return stream.then(f) })
}
exports.edit = edit
function edit(f, stream) {
/**
Returns new edited form of the given `stream` by lazily applying given `f`
function to each element resolution except end `null`. This is function is
just like alter with only difference that stream end `null` propagates to the
resulting stream bypassing `f` (This simplifies `f` interface, since it's
guaranteed to be called only with an objects that contain `head` and `tail`
properties).
## Examples
function power(n, stream) {
return edit(function(stream) {
return Stream(Math.pow(stream.head, n), power(n, stream.tail))
}, stream)
}
var powered = power(2, Stream.of(1, 2, 3, 4))
print(powered) // !
**/
return alter(function(stream) {
return stream && f(stream)
}, stream)
}
exports.print = (function(fallback) {
// `print` may be passed a writer function but if not (common case) then it
// should print with existing facilities. On node use `process.stdout.write`
// to avoid line breaks that `console.log` uses. If there is no `process`
// then fallback to `console.log`.
fallback = typeof(process) !== 'undefined' ? function write() {
process.stdout.write(slice(arguments).join(' '))
} : console.log.bind(console)
return function print(stream, write, continuation) {
/**
Utility method for printing streams. Optionally print may be passed a
`write` function that will be used for writing. If `write` not passed it
will fallback to `process.stdout.write` on node or to `console.log` if not
on node.
@param {Function} [write]
**/
write = write || fallback
if (!continuation) setTimeout(write, 1, '')
if (stream) print(stream.tail, write, true)
}, function(reason) {
setTimeout(write, 1, '/', reason, '>')
})
}
})()
exports.print[run.index] = 0
exports.take = take
function take(n, stream) {
/**
Returns stream containing first `n` (or all if has less) items of `this`
stream. For more generic API see `take.while`.
@param {Number} n
Number of items to take.
## Examples
var numbers = Stream.of(10, 23, 2, 7, 17)
print(take(2, numbers)) //
print(take(100, numbers)) //
print(take(Infinity, numbers)) //
print(take(0, numbers)) //
**/
return n <= 0 ? Stream.empty : edit(function(stream) {
return Stream(stream.head, take(n - 1, stream.tail))
}, stream)
}
// Note, that we quote 'while` & provide `until` alias since use of keywords
// like `while` is forbidden in older JS engines.
take['while'] = take.until = function takewhile(f, stream) {
/**
Returns stream containing only first `n` items on which given `f` predicate
returns `true`. Since older JS engines do not allow keywords as properties,
this function is also exposed via `take.until` function.
## Examples
var numbers = Stream.iterate(function(n) { return n + 1 }, 0)
var digits = take.while(function(n) {
return n <= 9
}, numbers)
print(digits) //
**/
return edit(function(stream) {
return f(stream.head) ? Stream(stream.head, takewhile(f, stream.tail)) : null
}, stream)
}
exports.drop = drop
function drop(n, stream) {
/**
Returns stream of this items except first `n` ones. Returns empty stream
has less than `n` items.
@param {Number} n
Number of items to drop.
## Examples
var numbers = Stream.of(10, 23, 2, 7, 17)
print(drop(3, numbers)) //
print(drop(100, numbers)) //
print(drop(0, numbers)) //
**/
return n <= 0 ? stream : edit(function(stream) {
return drop(n - 1, stream.tail)
}, stream)
}
// Note, that we quote 'while` & provide `until` alias since use of keywords
// like `while` is forbidden in older JS engines.
drop['while'] = drop.until = function dropwhile(f, stream) {
/**
Returns stream containing all except first `n` items on which given `f`
predicate returns `true`. Since older JS engines do not allow keywords as
properties, this function is also exposed via `drop.until` function.
## Examples
var numbers = Stream.iterate(function(n) { return n + 1 }, -10)
var positives = drop.while(function(n) {
return n < 0
}, numbers)
print(take(5, positives)) //
**/
return edit(function(stream) {
return f(stream.head) ? dropwhile(f, stream.tail) : stream
}, stream)
}
exports.head = head
exports.first = head // alias for people from clojure.
function head(stream) {
/**
Returns stream that contains only first item of the given stream.
**/
return edit(function(stream) {
return Stream(stream.head, Stream.empty)
}, stream)
}
exports.tail = tail
exports.rest = tail // alias for people from clojure.
function tail(stream) {
/**
Returns stream that contains all element of the given stream except the first
one.
**/
return edit(function(stream) {
return stream.tail
}, stream)
}
exports.map = map
function map(f, stream) {
/**
Returns a stream consisting of the result of applying `f` to the
items of `this` stream.
@param {Function} fn
function that maps each value
## Examples
var objects = Stream.of({ name: 'foo' }, { name: 'bar' })
var names = map(function($) { return $.name }, objects)
print(names) //
var numbers = Stream.of(1, 2, 3)
var doubles = map(function onEach(number) { return number * 2 }, numbers)
print(doubles) //
**/
return edit(function(stream) {
return Stream(f(stream.head), map(f, stream.tail))
}, stream)
}
map.all = function mapall(f) {
return map(function(zipped) {
return f.apply(null, zipped)
}, zip.all.apply(null, slice(arguments, 1)))
}
map.all[run.index] = 1
exports.filter = filter
function filter(f, stream) {
/**
Returns a stream of items from the given `stream` on which `f` predicate
returns `true`.
@param {Function} f
predicate function
@param {Stream}
stream to filter
## Examples
var numbers = Stream.of(10, 23, 2, 7, 17)
var digits = filter(function(value) {
return value >= 0 && value <= 9
}, numbers)
print(digits) //
**/
return edit(function(stream) {
return f(stream.head) ? Stream(stream.head, filter(f, stream.tail))
: filter(f, stream.tail)
}, stream)
}
exports.reduce = reduce
function reduce(f, stream, initial) {
/**
returns stream containing result of applying `f` to the `initial`
and first item of stream, then applying `f` to that result and the
2nd item, etc.. If stream has no items then stream containing
`initial` is retuned.
## Examples
var sum = reduce(function(x, y) {
return x + y
}, Stream.of(1, 2, 3), 0) //
**/
return lazy(function(result) {
var deferred = defer()
function accumulate(stream) {
if (!stream) return deferred.resolve(Stream.of(result))
result = f(result, stream.head)
stream.tail.then(accumulate, deferred.reject)
}
stream.then(accumulate, deferred.reject)
return deferred.promise
}, initial)
}
reduce[run.index] = 1
exports.zip = zip
function zip(first, second) {
/**
This function returns stream of tuples, where the n-th tuple contains the
n-th element from each of the argument streams. The returned stream is
truncated in length to the length of the shortest argument stream.
@params {Function}
source steams to be combined
@examples
var a = list([ 'a', 'b', 'c' ])
var b = list([ 1, 2, 3, 4 ])
var c = list([ '!', '@', '#', '$', '%' ])
var abc = zip(a, b, c)
abs(console.log)
// [ 'a', 1, '!' ]
// [ 'b', 2, '@' ]
// [ 'c', 3, '#' ]
**/
return lazy(function() {
var future = second.then()
return capture(function(reason) {
return alter(function(stream) {
return stream && Stream.error(reason)
}, future)
}, edit(function(first) {
return first && edit(function(second) {
return second && Stream([ first.head, second.head ],
zip(first.tail, second.tail))
}, second)
}, first))
})
}
zip[run.index] = 0
zip.all = reducer(function zipall(first, rest) {
return map(unzip, zip(first, rest))
})
zip.all[run.index] = 0
exports.append = append
function append(first, rest) {
/**
Returns a stream consisting of all items of `first` stream followed by
all items of `rest` stream. All errors will propagate to the resulting
stream. To append more than two streams use `append.all(first, second, ...)`
instead.
## Examples
print(append(Stream.of(1, 2), Stream.of('a', 'b'))) //
print(append.all(Stream.of(1), Stream.of(2), Stream.of(3))) //
**/
rest = rest || Stream.empty
return alter(function(stream) {
return stream ? Stream(stream.head, append(stream.tail, rest)) : rest
}, first)
}
append[run.index] = 0
append.all = reducer(append)
append.all[run.index] = 0
exports.flatten = flatten
function flatten(stream) {
/**
Takes `stream` of streams and returns stream consisting of items from each
stream in the given `stream` in order as they appear there. All errors
propagate up to the resulting stream.
## Examples
var stream = flatten(Stream.of(delay(Stream.of('async')), Stream.of(1, 2)))
print(stream) //
**/
return edit(function(stream) {
return append(stream.head, flatten(stream.tail))
}, stream)
}
function expand(f, stream) {
/**
Takes `stream` and expands each item in it using given `f` by returning a
stream of expansion elements. This is just a convenience shortcut for
`flatten(map(f, stream))`.
**/
return flatten(map(f, stream))
}
exports.expand = expand;
exports.mix = mix
function mix(source, rest) {
/**
Returns a stream consisting of all items from `source` and `rest` stream in
order of their accumulation. This is somewhat parallel version of `append`,
since it starts reading from both streams simultaneously and yields head that
comes in first. If streams are synchronous, first come firs serve makes no
real sense, in which case, resulting stream contains first items of both
streams, followed by second items of both streams, etc.. All errors
propagate to the resulting. In order to `mix` more than two streams use
`mix.all(a, b, c, ...)` instead.
## Examples
var stream = mix(Stream.of(1, 2), (Stream.of('a', 'b'))
print(stream) //
print(mix(delay(Stream.of(1, 2)), Stream.of(3, 4))) //
**/
rest = rest || Stream.empty
return lazy(function() {
var pending = [ defer(), defer() ]
var first = pending[0].promise
var last = pending[1].promise
function resolve(value) { pending.shift().resolve(value) }
function reject(reason) { pending.shift().reject(reason) }
source.then(resolve, reject)
rest.then(resolve, reject)
return alter(function(stream) {
return stream ? Stream(stream.head, mix(last, stream.tail)) : last
}, first)
})
}
mix[run.index] = 0
mix.all = reducer(mix)
mix.all[run.index] = 0
exports.merge = merge
function merge(stream) {
/**
Takes `stream` of streams and returns stream consisting of all items of each
item stream in the order of their accumulation. This is somewhat parallel
version of `flatten`, as it starts reading from all item streams
simultaneously and yields head that comes first. If streams are synchronous,
first come first serve makes no real sense, in which case, this function
will behave as flatten. All errors will propagate to the resulting stream.
## Examples
var async = delay(Stream.of('async', 'stream'))
var stream = Stream.of(async, Stream.of(1, 2, 3))
print(stream) //
**/
return edit(function(stream) {
return mix(stream.head, merge(stream.tail))
}, stream)
}
exports.delay = delay
function delay(ms, stream) {
/**
Takes a `source` stream and return stream of it's items, such that each
element yield is delayed with a given `time` (defaults to 1) in milliseconds.
**/
return stream ? edit(function(stream) {
var deferred = defer()
setTimeout(deferred.resolve, ms, Stream(stream.head, delay(ms, stream.tail)))
return deferred.promise
}, stream) : delay(1, ms)
}
exports.each = each
function each(f, e, stream) {
/**
Takes `f` and optionally `e` functions and a `stream` as arguments. Calls `f`
with each item of the given `stream`. If optional `e` is passed it's called
whenever given `stream` reaches it's end or error occurs.
**/
if (!stream) return each(f, null, e)
stream.then(function onEach(stream) {
if (!stream) e && e(null)
else if (false !== f(stream.head)) each(f, e, stream.tail)
}, e)
}
});