The Streaming
library of language-ext is all about compositional streams. There are two key types of streaming
functionality: closed-streams and open-streams...
Closed streams
Closed streams are facilitated by the Pipes
system. The types in the Pipes
system are compositional
monad-transformers that 'fuse' together to produce an EffectT<M, A>
. This effect is a closed system,
meaning that there is no way (from the API) to directly interact with the effect from the outside: it can be executed
and will return a result if it terminates.
The pipeline components are:
These are the components that fuse together (using the |
operator) to make an EffectT<M, A>
. The
types are monad-transformers that support lifting monads with the MonadIO
trait only (which constrains M
). This
makes sense, otherwise the closed-system would have no effect other than heating up the CPU.
There are also more specialised versions of the above that only support the lifting of the Eff<RT, A>
effect-monad:
They all fuse together into an Effect<RT, A>
.
Pipes are especially useful if you want to build reusable streaming components that you can glue together ad infinitum. Pipes are, arguably, less useful for day-to-day stream processing, like handling events, but your mileage may vary.
More details on the Pipes page
.
Open streams
Open streams are closer to what most C# devs have used classically. They are like events or IObservable
streams.
They yield values and (under certain circumstances) accept inputs.
Source
andSourceT
yield values synchronously or asynchronously depending on their construction.Sink
andSinkT
receives values and propagates them through the channel they're attached to.Conduit
andConduitT
provides and input transducer (acts like aSink
), an internal buffer, and an output transducer (acts like aSource
).
I'm calling these 'open streams' because we can
Post
values to aSink
/SinkT
and we canReduce
values yielded bySource
/SourceT
. So, they are 'open' for public manipulation, unlikePipes
which fuse the public access away.
Source
Source<A>
is the 'classic stream': you can lift any of the following types into it: System.Threading.Channels.Channel<A>
,
IEnumerable<A>
, IAsyncEnumerable<A>
, or singleton values. To process a stream, you need to use one of the Reduce
or ReduceAsync
variants. These take Reducer
delegates as arguments. They are essentially a fold over the stream of
values, which results in an aggregated state once the stream has completed. These reducers can be seen to play a similar
role to Subscribe
in IObservable
streams, but are more principled because they return a value (which we can leverage
to carry state for the duration of the stream).
Source
also supports some built-in reducers:
Last
- aggregates no state, simply returns the last item yieldedIter
- this forces evaluation of the stream, aggregating no state, and ignoring all yielded values.Collect
- adds all yielded values to aSeq<A>
, which is then returned upon stream completion.
SourceT
SourceT<M, A>
is the classic-stream embellished - it turns the stream into a monad-transformer that can
lift any MonadIO
-enabled monad (M
), allowing side effects to be embedded into the stream in a principled way.
So, for example, to use the IO<A>
monad with SourceT
, simply use: SourceT<IO, A>
. Then you can use one of the
following static
methods on the SourceT
type to lift IO<A>
effects into a stream:
SourceT.liftM(IO<A> effect)
creates a singleton-streamSourceT.foreverM(IO<A> effect)
creates an infinite stream, repeating the same effect over and overSourceT.liftM(Channel<IO<A>> channel)
lifts aSystem.Threading.Channels.Channel
of effectsSourceT.liftM(IEnumerable<IO<A>> effects)
lifts anIEnumerable
of effectsSourceT.liftM(IAsyncEnumerable<IO<A>> effects)
lifts anIAsyncEnumerable
of effects
Obviously, when lifting non-
IO
monads, the types above change.
SourceT
also supports the same built-in convenience reducers as Source
(Last
, Iter
, Collect
).
Sink
Sink<A>
provides a way to accept many input values. The values are buffered until consumed. The sink can be
thought of as a System.Threading.Channels.Channel
(which is the buffer that collects the values) that happens to
manipulate the values being posted to the buffer just before they are stored.
This manipulation is possible because the
Sink
is aCoFunctor
(contravariant functor). This is the dual ofFunctor
: we can think ofFunctor.Map
as converting a value fromA -> B
. WhereasCoFunctor.Comap
converts fromB -> A
.
So, to manipulate values coming into the Sink
, use Comap
. It will give you a new Sink
with the manipulation 'built-in'.
SinkT
SinkT<M, A>
provides a way to accept many input values. The values are buffered until consumed. The sink can
be thought of as a System.Threading.Channels.Channel
(which is the buffer that collects the values) that happens to
manipulate the values being posted to the buffer just before they are stored.
This manipulation is possible because the
SinkT
is aCoFunctor
(contravariant functor). This is the dual ofFunctor
: we can think ofFunctor.Map
as converting a value fromA -> B
. WhereasCoFunctor.Comap
converts fromB -> A
.
So, to manipulate values coming into the SinkT
, use Comap
. It will give you a new SinkT
with the manipulation 'built-in'.
SinkT
is also a transformer that lifts types of K<M, A>
.
Conduit
Conduit<A, B>
can be pictured as so:
+----------------------------------------------------------------+
| |
| A --> Transducer --> X --> Buffer --> X --> Transducer --> B |
| |
+----------------------------------------------------------------+
- A value of
A
is posted to theConduit
(viaPost
) - It flows through an input
Transducer
, mapping theA
value toX
(an internal type you can't see) - The
X
value is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel
) - Any invocation of
Reduce
will force the consumption of the values in the buffer - Flowing each value
X
through the outputTransducer
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
Conduit
is a CoFunctor
, call Comap
to manipulate the pre-processing transducer. Conduit
is also a Functor
, call
Map
to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile
,
Filter
, Skip
, Take
, etc.
Conduit
supports access to aSink
and aSource
for more advanced processing.
ConduitT
ConduitT<M, A, B>
can be pictured as so:
+------------------------------------------------------------------------------------------+
| |
| K<M, A> --> TransducerM --> K<M, X> --> Buffer --> K<M, X> --> TransducerM --> K<M, B> |
| |
+------------------------------------------------------------------------------------------+
- A value of
K<M, A>
is posted to theConduit
(viaPost
) - It flows through an input
TransducerM
, mapping theK<M, A>
value toK<M, X>
(an internal type you can't see) - The
K<M, X>
value is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel
) - Any invocation of
Reduce
will force the consumption of the values in the buffer - Flowing each value
K<M, A>
through the outputTransducerM
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
ConduitT
is a CoFunctor
, call Comap
to manipulate the pre-processing transducer. Conduit
is also a Functor
, call
Map
to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile
,
Filter
, Skip
, Take
, etc.
ConduitT
supports access to aSinkT
and aSourceT
for more advanced processing.
Open to closed streams
Clearly, even for 'closed systems' like the Pipes
system, it would be beneficial to be able to post values
into the streams from the outside. And so, the open-stream components can all be converted into Pipes
components
like ProducerT
and ConsumerT
.
Conduit
andConduitT
supportToProducer
,ToProducerT
,ToConsumer
, andToConsumerT
.Sink
andSinkT
supportsToConsumer
, andToConsumerT
.Source
andSourceT
supportsToProducer
, andToProducerT
.
This allows for the ultimate flexibility in your choice of streaming effect. It also allows for efficient concurrency in
the more abstract and compositional world of the pipes. In fact ProducerT.merge
, which merges many streams into one,
uses ConduitT
internally to collect the values and to merge them into a single ProducerT
.
Contents
Sub modules
Conduit |
ConduitT |
Pipes |
Sink |
SinkT |
Source |
SourceT |
Transducers |
Settings for Conduit
channels
Parameters
type | A | Bound value type |
Fields
Methods
method Buffer<A> Bounded (uint size) Source #
Store a bounded number of messages, specified by the 'size' argument
Parameters
param | size | Bounded size of the buffer |
record UnboundedBuffer <A> Source #
record BoundedBuffer <A> (uint Size) Source #
record SingleBuffer <A> Source #
record LatestBuffer <A> (A Value) Source #
record NewestBuffer <A> (uint Size) Source #