Kotlin coroutines for OutputStream.
In this post, we consider a low-level implementation for specific
coroutines that are used to process streams in a server-side Java application.
There is an existing code that writes the output to a given
One may write it in Java as:
We need to intercept the output that is bing written by the
ExistingService replies with an HTML, and the goal is to extract some part of it, say a
with a given
ExistingService may be returning a ZIP archive, and the goal can be to add a file in it.
There are many similar cases, where an
OutputStream post processing can be necessary.
ExistingService writes output in the following format:
<SIZE> is 4 bytes length message.
<CHANNEL-ID> is a one-byte message.
<DATA> is a
byte sequence of given size.
The such or similar format is widely used to send several streams within one connection. For example, it is used in SSH-2, HTTP/2 or Git.
Out goal is to include additional messages to the stream on the fly, while it is being generated
by the call to an
We just need to implement the method:
A Trivial Java Implementation
In Java we have
PipedInputStream classes that may turn an OutputStream
InputStream. Next, the processing can be done easily.
We have a thread, that reads data from an
The cost here is high. We need an additional thread to implement that. Additionally,
the whole data is copied to a buffer from which a
PipedInputStream is reading.
Finally, piped streams implementation uses a level of synchronization to wait for
next data portion to arrive. That can be problematic for some loads.
A Smart Java Implementation
One can do a smarter implementation. Directly implement the
OutputStream and do the processing
and patching the output in there.
What we do here is to add a state to our
OutputStream implementation. Each call to
yields an update of the state.
A State Machine. That is what the code from here is about. It is up to programmer if to have a state explicitly or not. Think about maintainability of such code first.
The implementation here can be quite complex. As an example, I show a byte-by-byte
processing. In real cases, it can be way more efficient to implement
write(byte, int, int) method.
The benefit here is that we no longer need an extra thread to pipe the output as input. Sill, code is getting more complex.
A Coroutine State Machine
In Kotlin 1.1 we have coroutines. The alternative way to implement a state machine with the help of compiler, with way more readable, linear-looking code.
First, we need to declare an interface with
suspend functions, e.g.
Also, we need an implementation of the
OutputStream that allows us to use the
DataReader. That is
inverseRead. We’ll turn back to the implementation a bit later.
Finally, we are able to implement the data processing:
Check the code above. It is quite similar to what we had in A Trivial Java Implementation. The difference
is huge. Every call to
suspend function pauses the execution of a method, so the execution turns to
write method implementation of
inverseRead. We are able now to write
a linear looking code, that is executed in a piece-by-piece manner.
Implementation of the Coroutine
The only missing part is the
inverseRead. We follow the coroutines documentation
to implement that. Great is there are many examples too.
DataReader#pipe functions to suspend execution until there is enough data
OutputStream#write method. We have the following state object for that:
Next, we are able to implement the
DataReader interface in the following way:
All we have here is a call to the specific
suspendCoroutine function in Kotlin, which does the suspend. I decided
to omit for simplicity a defensive asserts that one may have in both
read function implementation.
OutputStream implementation in the example is executing next suspended function in a loop. The current
implementation does not check the state at the end of data too. The straightforward implementation is as follows:
At that moment we are ready to implement the
Coroutine vs State Machine
It turned out we still needed to implement a state machine (via
DataReaderState class). It
was necessary to add a state for each
suspend function of
DataReader interface. That was
the code we need to write as building block (and there
are amazing libraries for most
of the cases)
The good part is that the rest part of the code, where we do the actual data decoding (the
now free from an explicit state machine style programming. One can have code written as easy
as possible. It’s easier to understand and to check. And, for example, we may use loops, try-catches
and any other code constructs we like using for ordinary programs.
In other words, with the help of Kotlin suspend functions we may minimize the number of complex functions and concentrate on the actual development.
For more information on coroutines power in Kotlin, you may refer to GeekOut Talk by Roman Elizarov or to the documentationcomments powered by Disqus