Streams and Subscriptions
For every tool in the async/await
toolbox, there is an equilavent in
Effection. We were already introduced to the most important of these in
the introduction.
Async | Effection |
---|---|
Promise | Operation |
await | yield* |
async function | function* |
Likewise, there are also equivalents for the Async iteration protocols. They are:
Async | Effection |
---|---|
AsyncIterator | Subscription |
AsyncIterable | Stream |
for await | for yield* each |
If you're familiar with how to use these constructs, then you will be right at home working with sequences of data in Effection. In particular, streams are useful for modeling things like UI events and network connections, and in this guide we'll see how to create, transform, and consume them.
Subscription
The Effection equivalent of AsyncIterator
is the
Subscription
. Like its async counterpart, a
subscription acts like a queue that provides a sequence of values via a
next()
method. The only difference is that instead of returning a
Promise
that fulfills to an iterator result, it returns an
Operation
that yields an iterator result.
function* forEach(subscription) {
let next = yield* subscription.next();
while (!next.done) {
console.log(`value: ${next.value}`);
next = yield* subscription.next();
}
console.log(`done: ${next.value}`);
}
Subscriptions, like async iterators, are stateful objects, and calling
the next()
method on them alters their internal state. Crucially,
taking the next value from the subscription not only retrieves that
value, but also removes it, so that we don't iterate the same value twice.
Just as with async interators, subscriptions are low-level constructs that you don't interact with all that often, but it is good to know that they are there. Most of the time however, instead of working with subscriptions directly, you'll work with streams.
Stream
Streams are the Effection equivalent of async iterables,
and like them, are stateless objects that does not do anything by
themselves. Whereas a Subscription
represents hot, active state, a
Stream
on the other hand contains a recipe on how to create
a subscription.
A Stream can have any number of subscriptions, each of which receives
the same set of values. Effection ships with Channel
, which is
a very handy way to create streams and by proxy, subscriptions. Let's use this
to show the difference between them:
import { main, createChannel } from 'effection';
await main(function*() {
let channel = createChannel();
// the channel has no subscribers yet!
yield* channel.send('too early');
let subscription1 = yield* channel.subscribe();
let subscription2 = yield* channel.subscribe();
yield* send('hello');
yield* send('world');
console.log(yield* subscription1.next());
//=> { done: false, value: "hello" }
console.log(yield* subscription1.next());
//=> { done: false, value: "world" }
console.log(yield* subscription2.next());
//=> { done: false, value: "hello" }
console.log(yield* subscription2.next());
//=> { done: false, value: "world" }
});
As you can see, the channel can have multiple subscribers and sending a message to the channel adds it to each active subscription. If the channel does not have any active subscriptions, then sending a message to it does nothing.
Transforming Streams
Because streams are completely stateless, it means that they can be transformed
into other streams using nothing but simple functions. For example, we can apply
several stream transformations in sequence by using the built-in map()
and filter()
functions.
Let's look at an example of this in action by using map()
:
import { main, map, createChannel } from 'effection';
await main(function*() {
let channel = createChannel();
let textStream = map(value => value.text)(channel);
let uppercaseStream = map(value => value.toUpperCase())(textStream);
let subscription = yield* uppercaseStream.susbscribe();
yield* channel.send({ text: 'hello' });
yield* channel.send({ text: 'world' });
yield* subscription.next();
//=> { done: false, value: 'HELLO' }
yield* subscription.next();
//=> { done: false, value: 'WORLD' }
});
If we unpack this a bit, we can see that we're creating a new Stream
called
textStream
using the map
function on stream
. It derives its values by
pulling the text
property from each value it sees.
We then use map
again on textStream
to create uppercaseStream
, which
converts each value into uppercase.
When we subscribe to uppercaseStream
and send a value to the channel, we can
see that all of our transformations are applied.
The filter()
function can be used to restrict the values emitted by a
stream. The following uses a regular expression to select only those values
that include the string "ello" somewhere inside them.
import { main, filter, createChannel } from 'effection';
await main(function*() {
let channel = createChannel();
let elloStream = filter(value => value.match(/ello/))(channel);
let subscription = yield* elloStream.subscribe();
yield* channel.send('hello');
// our filtered stream skips over this value
yield* channel.send('world');
yield* channel.send('jello');
console.log(yield* subscription.next());
//=> { done: false, value: 'hello' }
console.log(yield* subscription.next());
//=> { done: false, value: 'jello' }
});
When it is run, it prints out "hello" and "jello", but skips "world".
Piping Streams
But what's up with that weird map(fn)(stream)
and
filter(fn)(stream)
syntax anyway? It might have even thrown you for
a loop the first time you saw it. Why don't we just pass the stream
that we want to transform as a second argument, like map(fn, stream)
?
The reason is so that we can use map()
, filter()
and functions like them to
very rapidly compose into streaming pipelines. For example, we could re-write
the text transformation above using the pipe()
utility function.
import { main, pipe, map, createChannel } from 'effection';
await main(function*() {
let channel = createChannel();
let messages = pipe(
channel,
map(value => value.text),
map(value => value.toUpperCase()),
);
let subscription = yield* messages.subscribe();
yield* channel.send({ text: 'hello' });
yield* channel.send({ text: 'world' });
yield* subscription.next();
//=> { done: false, value: 'HELLO' }
yield* subscription.next();
//=> { done: false, value: 'WORLD' }
});
We can do this because invoking map(fn)
returns a function that
takes a stream as its argument, and returns a stream. This allows us
to chain together any number of these stream-to-stream functions into
a pipeline to create new streams that apply each transformation in
sequence.
💡The
pipe()
function that comes with Effection is just a utility for function composition that is provided for convenience. You can just as easily usefp-ts#pipe
orlodash#flow
In fact, any function that takes a stream and returns a stream can be used in
such a way, not just map()
, and filter()
. This means you can write your own
stream combinators, just so long as they return a function that takes a stream
and returns as stream.
for yield* each
The entire point of having a stream is to consume values from it, and we
have already seen how we can use yield*
to subscribe to a Stream and turn
a Stream into a Subscription. But there is an easier way!
When working with an async iterator, you would not normally traverse it by
manually invoking the next()
method repeatedly. Instead, you would use the
for await of
syntax to conveniently focus on the
work to be done with each item instead of the mechanics of iteration.
It is the same with Effection streams.
The simplest way to consume a stream in Effection is with the for yield* each
loop. It is almost identical to the for await
loop from
async/await
. The biggest difference is that you must yield to the
each.next
operation as the last step of your loop.
import { main, createChannel, each, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
for (let value of yield* each(channel)) {
console.log('got value:', value);
yield* each.next();
}
Why do we need to use spawn()
here? We know that sending values to a
stream does nothing unless someone is subscribed to the stream, so we
cannot send any values before we begin our loop, but we also cannot
send any values after we begin our loop because the loop will never
complete until the stream closes (more about that later). So we need
to run both the loop and the sending of values concurrently, and as
we've already learned, when we need to do multiple things
concurrently, that's when we use spawn()
.
We could also flip this example around like this:
import { main, createChannel, next, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
for (let value of yield* each(channel)) {
console.log('got value:', value);
yield* each.next();
}
});
yield* sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
Another way of consuming values from a stream is to use the first
. This is
handy if you want to just the very next item from a stream of events.
import { main, first, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
yield sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
let value = yield* first(channel);
console.log(value); // logs 'hello'
});
As you can see here, once we send any value to the Stream, first()
returns that value. Now you might be tempted to call first()
multiple times, like this:
// THIS IS NOT IDEAL
import { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('world');
});
let firstValue = yield* first(channel);
console.log(firstValue); // logs 'hello'
let secondValue = yield* first(channel);
console.log(secondValue); // logs 'world'
});
And while this works, it has a problem that becomes apparent if we slightly change the order we do things in:
// THIS IS NOT IDEAL
imchannel { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* channel.send('world');
});
let firstValue = yield* first(channel);
console.log(firstValue); // logs 'hello'
yield* sleep(1000);
// will block forever! We missed the message!
let secondValue = yield* first(channel);
// we never get here!
console.log(secondValue);
});
This makes it very clear why Subscriptions are necessary. It is because unlike listening for one-off events, a subscription guarantees that we can never miss any messages. Here we can see how Subscriptions are more resilient:
imchannel { main, createChannel, spawn, sleep } from 'effection';
await main(function*() {
let channel = createChannel();
let subscription = yield* channel.subscribe();
yield* spawn(function*() {
yield* sleep(1000);
yield* channel.send('hello');
yield* channel.send('world');
});
let { value: firstValue } = yield* subscription.next();
console.log(firstValue); // logs 'hello'
yield* sleep(1000);
let { value: secondValue } = yield subscription.next();
console.log(secondValue); // logs 'world'
});
Closing streams
All good things must come to an end, including a stream of data. In these cases
Effection streams and subscriptions optionally support passing a final piece of
data when there are no more values to be received. Usually, if present, this
piece of data represents a summary of the stream and something about its final
end state. For example, when a websocket is closed it emits a
close
event that contains a numeric code and a description of
why the socked was closed.
The value of this last piece of data is the value of the iterator result when
the done
attribute is true. Unsprisingly, this mirrors the behavior of an
async iterator exactly.
import { main, sleep, spawn, createChannel } from "effection";
await main(function*() {
let channel = createChannel();
let subscription = yield* channel.subscribe();
yield* spawn(function*() {
yield* channel.send('hello');
yield* sleep(1000);
yield* channel.send('hello');
yield* channel.close({ words: 2 });
});
let next = yield* subscription.next();
while (!next.done) {
console.log(next.value);
next = yield* subscription.next();
}
console.log(`${next.value} words total`);
});
//=> hello
//=> world
//=> 2 words total
Just like with an async iterator, if you want to access the very last iterator
result, you cannot use the for...of
. You must use a while-loop
. This is
because the body of the loop only executes for iterator result where the done
property is false
. It does not execute for the last iterator result where
done
is true.
Creating your own streams
Most of the examples thus far have shown how to consume streams that
are originated from within an operation, and so have used the
preferred mechanism for that which is the Channel. To do
this, we run the channel's send()
operation. But because send()
is
an operation, it means that in order to send a message to the stream,
we must already have to be inside an operation. However, this poses a
problem for events that originate from outside of an operation such
as an event listener. In this case, we need to be able to call a plain
vanilla javascript function which is not an operation, and still
notify any subscribers to a stream. For these cases there is the
Signal
interface which can be created with
createSignal
function.
In the following example, we use a signal as an event callback to log all of the events
import { createSignal, each } from "effection";
export function* logClicks(button) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
for (let click of yield* each(clicks)) {
console.dir({ click });
yield* each.next();
}
} finally {
button.removeEventListener("click", clicks.send);
}
}
First, we create the signal, then attach its send()
function to the
event target's callback. It then loops over every click event received
in this way and logs out its contents to the console. Notice that,
like a well-behaved operation, we make sure to uninstall the listener
on the button so that we left it in the same state we found it.
This is a good demonstration of the concept, but it isn't as useful as it could be because any time we want to do something like the above to consume a stream of clicks, we'd first need to create a signal, attach the signal's callback, and then iterate over its content. For example, if we wanted to make an operation that disabled a button by preventing the default action every time, we would have to regurgitate the same boilerplate code as in our click logger:
import { createSignal, each } from "effection";
export function* cancelClicks(button) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
for (let click of yield* each(clicks)) {
click.preventDefault();
yield* each.next();
}
} finally {
button.removeEventListener("click", clicks.send);
}
}
In other words, the problem with this method, is that you have to
define the stream and consume it in the same place. But what we'd like
to do instead is to define the stream in one place, but consume it in
another. That way, we could specify the setup and teardown logic once,
but utilize it multiple times. If we had a way to do that, say a clicksOn()
function that could take any html element and return a stream of clicks on that
element, then we could write something like the following:
function* logAndCancel(button) {
let clicks = clicksOn(button);
yield* spawn(function*() {
for (let click of yield* each(clicks)) {
console.log(click);
yield* each.next();
}
})
yield* spawn(function*() {
for (let click of yield* each(clicks)) {
click.preventDefault();
yield* each.next();
}
})
}
It turns out that resources are just what we need to make
this happen. If you recall, a Stream
is just has a
subscribe() Operation
that returns a
Subscription
. So the simplest way to implement such
an operation is as a resource that provides a subscription.
💡Most of the time, you won't need to implement your own streams, but when you do, a resource is the easiest way to do it.
Armed with resources, we can now implement our hypothetical clicksOn
utility.
import { resource } from "effection"
export function clicksOn(button) {
return {
subscribe: () => resource(function*(provide) {
let clicks = createSignal();
try {
button.addEventListener("click", clicks.send);
let subscription = yield* clicks.subscribe();
yield* provide(subscription);
} finally {
button.removeEventListener("click", send);
}
}),
};
}
We have now encapsulated the setup and teardown of our listener in a single place, but the actual consuming of the stream of clicks can happen anywhere.