Kotlin coroutines for OutputStream.
In this post, we consider a low-level implementation for specific
coroutines that are used to process streams in a server-side Java application.
There is an existing code that writes the output to a given OutputStream
object.
One may write it in Java as:
We need to intercept the output that is bing written by the #callExistingCode
function.
Example
Say the ExistingService
replies with an HTML, and the goal is to extract some part of it, say a <div>
with a given id
.
Or an ExistingService
may be returning a ZIP archive, and the goal can be to add a file in it.
There are many similar cases, where an OutputStream
post processing can be necessary.
Problem Statement
Suppose the ExistingService
writes output in the following format:
Where <SIZE>
is 4 bytes length message. <CHANNEL-ID>
is a one-byte message. <DATA>
is a
byte sequence of given size.
The such or similar format is widely used to send several streams within one connection. For example, it is used in SSH-2, HTTP/2 or Git.
Out goal is to include additional messages to the stream on the fly, while it is being generated
by the call to an ExistingService
.
We just need to implement the method:
A Trivial Java Implementation
In Java we have PipedOutputStream
/PipedInputStream
classes that may turn an OutputStream
into an InputStream
. Next, the processing can be done easily.
We have a thread, that reads data from an InputStream
, e.g.
The cost here is high. We need an additional thread to implement that. Additionally,
the whole data is copied to a buffer from which a PipedInputStream
is reading.
Finally, piped streams implementation uses a level of synchronization to wait for
next data portion to arrive. That can be problematic for some loads.
A Smart Java Implementation
One can do a smarter implementation. Directly implement the OutputStream
and do the processing
and patching the output in there.
What we do here is to add a state to our OutputStream
implementation. Each call to write
method
yields an update of the state.
A State Machine. That is what the code from here is about. It is up to programmer if to have a state explicitly or not. Think about maintainability of such code first.
The implementation here can be quite complex. As an example, I show a byte-by-byte
processing. In real cases, it can be way more efficient to implement
write(byte[], int, int)
method.
The benefit here is that we no longer need an extra thread to pipe the output as input. Sill, code is getting more complex.
A Coroutine State Machine
In Kotlin 1.1 we have coroutines. The alternative way to implement a state machine with the help of compiler, with way more readable, linear-looking code.
First, we need to declare an interface with suspend
functions, e.g.
Also, we need an implementation of the OutputStream
that allows us to use the DataReader
. That is
the function inverseRead
. We’ll turn back to the implementation a bit later.
Finally, we are able to implement the data processing:
Check the code above. It is quite similar to what we had in A Trivial Java Implementation. The difference
is huge. Every call to suspend
function pauses the execution of a method, so the execution turns to
the write
method implementation of OutputStream
in inverseRead
. We are able now to write
a linear looking code, that is executed in a piece-by-piece manner.
Implementation of the Coroutine
The only missing part is the inverseRead
. We follow the coroutines documentation
to implement that. Great is there are many examples too.
We implement DataReader#read
and DataReader#pipe
functions to suspend execution until there is enough data
send to OutputStream#write
method. We have the following state object for that:
Next, we are able to implement the DataReader
interface in the following way:
All we have here is a call to the specific suspendCoroutine
function in Kotlin, which does the suspend. I decided
to omit for simplicity a defensive asserts that one may have in both pipe
and read
function implementation.
The OutputStream
implementation in the example is executing next suspended function in a loop. The current
implementation does not check the state at the end of data too. The straightforward implementation is as follows:
At that moment we are ready to implement the inverseRead
function.
Coroutine vs State Machine
It turned out we still needed to implement a state machine (via DataReaderState
class). It
was necessary to add a state for each suspend
function of DataReader
interface. That was
the code we need to write as building block (and there
are amazing libraries for most
of the cases)
The good part is that the rest part of the code, where we do the actual data decoding (the readStream
function)
now free from an explicit state machine style programming. One can have code written as easy
as possible. It’s easier to understand and to check. And, for example, we may use loops, try-catches
and any other code constructs we like using for ordinary programs.
In other words, with the help of Kotlin suspend functions we may minimize the number of complex functions and concentrate on the actual development.
For more information on coroutines power in Kotlin, you may refer to GeekOut Talk by Roman Elizarov or to the documentation
comments powered by Disqus