LanguageExt.Streaming

LanguageExt.Streaming

The Streaming library of language-ext is all about compositional streams. There are two key types of streaming functionality: closed-streams and open-streams...

Closed streams

Closed streams are facilitated by the Pipes system. The types in the Pipes system are compositional monad-transformers that 'fuse' together to produce an EffectT<M, A>. This effect is a closed system, meaning that there is no way (from the API) to directly interact with the effect from the outside: it can be executed and will return a result if it terminates.

The pipeline components are:

These are the components that fuse together (using the | operator) to make an EffectT<M, A>. The types are monad-transformers that support lifting monads with the MonadIO trait only (which constrains M). This makes sense, otherwise the closed-system would have no effect other than heating up the CPU.

There are also more specialised versions of the above that only support the lifting of the Eff<RT, A> effect-monad:

They all fuse together into an Effect<RT, A>.

Pipes are especially useful if you want to build reusable streaming components that you can glue together ad infinitum. Pipes are, arguably, less useful for day-to-day stream processing, like handling events, but your mileage may vary.

More details on the Pipes page.

Open streams

Open streams are closer to what most C# devs have used classically. They are like events or IObservable streams. They yield values and (under certain circumstances) accept inputs.

I'm calling these 'open streams' because we can Post values to a Sink/SinkT and we can Reduce values yielded by Source/SourceT. So, they are 'open' for public manipulation, unlike Pipes which fuse the public access away.

Source

Source<A> is the 'classic stream': you can lift any of the following types into it: System.Threading.Channels.Channel<A>, IEnumerable<A>, IAsyncEnumerable<A>, or singleton values. To process a stream, you need to use one of the Reduce or ReduceAsync variants. These take Reducer delegates as arguments. They are essentially a fold over the stream of values, which results in an aggregated state once the stream has completed. These reducers can be seen to play a similar role to Subscribe in IObservable streams, but are more principled because they return a value (which we can leverage to carry state for the duration of the stream).

Source also supports some built-in reducers:

SourceT

SourceT<M, A> is the classic-stream embellished - it turns the stream into a monad-transformer that can lift any MonadIO-enabled monad (M), allowing side effects to be embedded into the stream in a principled way.

So, for example, to use the IO<A> monad with SourceT, simply use: SourceT<IO, A>. Then you can use one of the following static methods on the SourceT type to lift IO<A> effects into a stream:

Obviously, when lifting non-IO monads, the types above change.

SourceT also supports the same built-in convenience reducers as Source (Last, Iter, Collect).

Sink

Sink<A> provides a way to accept many input values. The values are buffered until consumed. The sink can be thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to manipulate the values being posted to the buffer just before they are stored.

This manipulation is possible because the Sink is a CoFunctor (contravariant functor). This is the dual of Functor: we can think of Functor.Map as converting a value from A -> B. Whereas CoFunctor.Comap converts from B -> A.

So, to manipulate values coming into the Sink, use Comap. It will give you a new Sink with the manipulation 'built-in'.

SinkT

SinkT<M, A> provides a way to accept many input values. The values are buffered until consumed. The sink can be thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to manipulate the values being posted to the buffer just before they are stored.

This manipulation is possible because the SinkT is a CoFunctor (contravariant functor). This is the dual of Functor: we can think of Functor.Map as converting a value from A -> B. Whereas CoFunctor.Comap converts from B -> A.

So, to manipulate values coming into the SinkT, use Comap. It will give you a new SinkT with the manipulation 'built-in'.

SinkT is also a transformer that lifts types of K<M, A>.

Conduit

Conduit<A, B> can be pictured as so:

+----------------------------------------------------------------+
|                                                                |
|  A --> Transducer --> X --> Buffer --> X --> Transducer --> B  |
|                                                                |
+----------------------------------------------------------------+

So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
Conduit is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile, Filter, Skip, Take, etc.

Conduit supports access to a Sink and a Source for more advanced processing.

ConduitT

ConduitT<M, A, B> can be pictured as so:

+------------------------------------------------------------------------------------------+
|                                                                                          |
|  K<M, A> --> TransducerM --> K<M, X> --> Buffer --> K<M, X> --> TransducerM --> K<M, B>  |
|                                                                                          |
+------------------------------------------------------------------------------------------+

So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
ConduitT is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile, Filter, Skip, Take, etc.

ConduitT supports access to a SinkT and a SourceT for more advanced processing.

Open to closed streams

Clearly, even for 'closed systems' like the Pipes system, it would be beneficial to be able to post values into the streams from the outside. And so, the open-stream components can all be converted into Pipes components like ProducerT and ConsumerT.

This allows for the ultimate flexibility in your choice of streaming effect. It also allows for efficient concurrency in the more abstract and compositional world of the pipes. In fact ProducerT.merge, which merges many streams into one, uses ConduitT internally to collect the values and to merge them into a single ProducerT.

Contents

Sub modules

Conduit
ConduitT
Pipes
Sink
SinkT
Source
SourceT
Transducers

record Buffer <A> Source #

Settings for Conduit channels

Parameters

type A

Bound value type

Fields

field Buffer<A> Unbounded Source #

Store an unbounded number of messages in a FIFO queue

field Buffer<A> Single Source #

Bounded number of messages to 1

field Buffer<A> New Source #

Newest(1)

Methods

method Buffer<A> Bounded (uint size) Source #

Store a bounded number of messages, specified by the 'size' argument

Parameters

param size

Bounded size of the buffer

method Buffer<A> Latest (A value) Source #

Only store the 'Latest' message, beginning with an initial value

'Latest' is never empty nor full.

method Buffer<A> Newest (uint size) Source #

Like Bounded, but Post never fails (the buffer is never full). Instead, old elements are discarded to make room for new elements

Parameters

param size

Size of the buffer

record BoundedBuffer <A> (uint Size) Source #

record SingleBuffer <A> Source #

record LatestBuffer <A> (A Value) Source #

record NewestBuffer <A> (uint Size) Source #

record NewBuffer <A> Source #