Kotlin coroutines for OutputStream.
In this post, we consider a low-level implementation for specific
coroutines that are used to process streams in a server-side Java application.
There is an existing code that writes the output to a given OutputStream
object.
One may write it in Java as:
interface ExistingService {
void callExistingCode(OutputStream resultOutput)
}
We need to intercept the output that is bing written by the #callExistingCode
function.
Example
Say the ExistingService
replies with an HTML, and the goal is to extract some part of it, say a <div>
with a given id
.
Or an ExistingService
may be returning a ZIP archive, and the goal can be to add a file in it.
There are many similar cases, where an OutputStream
post processing can be necessary.
Problem Statement
Suppose the ExistingService
writes output in the following format:
(<SIZE><CHANNEL-ID><DATA>)*
Where <SIZE>
is 4 bytes length message. <CHANNEL-ID>
is a one-byte message. <DATA>
is a
byte sequence of given size.
The such or similar format is widely used to send several streams within one connection. For example, it is used in SSH-2, HTTP/2 or Git.
Out goal is to include additional messages to the stream on the fly, while it is being generated
by the call to an ExistingService
.
We just need to implement the method:
OutputStream weaveStream(OutputStream target);
A Trivial Java Implementation
In Java we have PipedOutputStream
/PipedInputStream
classes that may turn an OutputStream
into an InputStream
. Next, the processing can be done easily.
We have a thread, that reads data from an InputStream
, e.g.
OutputStream weaveStream(OutputStream target) throws IOException {
PipedOutputStream pos = new PipeOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
runInThread(() -> weaveStream(pis, target));
return pos;
}
void weaveStream(InputStream source, OutputStream target) throws IOException {
while (true) {
//omitted error processing here
int size = extractSize(source.read(), source.read(), source.read(), source.read());
if (isLastBlock(size)) return;
int ch = extractChannelId(source.read());
writeHeader(target, size, ch);
copyBlock(target, size, source);
}
}
The cost here is high. We need an additional thread to implement that. Additionally,
the whole data is copied to a buffer from which a PipedInputStream
is reading.
Finally, piped streams implementation uses a level of synchronization to wait for
next data portion to arrive. That can be problematic for some loads.
A Smart Java Implementation
One can do a smarter implementation. Directly implement the OutputStream
and do the processing
and patching the output in there.
What we do here is to add a state to our OutputStream
implementation. Each call to write
method
yields an update of the state.
A State Machine. That is what the code from here is about. It is up to programmer if to have a state explicitly or not. Think about maintainability of such code first.
The implementation here can be quite complex. As an example, I show a byte-by-byte
processing. In real cases, it can be way more efficient to implement
write(byte[], int, int)
method.
OutputStream weaveStream(OutputStream target) throws IOException {
return new OutputStream() {
@Override
public void write(int b) throws IOException {
if (isReadingSize()) {
readMoreSize(b);
} else if (isReadingChannelId()) {
readMoreChannelId(b);
} else if (isProcessingData()) {
processModeData(b);
} else {
throw new IOException("Unexpected state");
}
}
};
}
The benefit here is that we no longer need an extra thread to pipe the output as input. Sill, code is getting more complex.
A Coroutine State Machine
In Kotlin 1.1 we have coroutines. The alternative way to implement a state machine with the help of compiler, with way more readable, linear-looking code.
First, we need to declare an interface with suspend
functions, e.g.
interface DataReader {
suspend fun read() : Int
suspend fun pipe(sz : Int, to: OutputStream)
}
Also, we need an implementation of the OutputStream
that allows us to use the DataReader
. That is
the function inverseRead
. We’ll turn back to the implementation a bit later.
fun inverseRead(reader: suspend DataReader.() -> Unit) : OutputStream { ... }
Finally, we are able to implement the data processing:
fun weaveStream(target: OutputStream): OutputStream {
return inverseRead { reader ->
readStream(reader, target)
}
}
suspend fun readStream(source: DataReader, target: OutputStream) {
while(true) {
val sz = extractSize(source.read(), source.read(), source.read(), source.read())
if (isLastBlock(sz)) return
val ch = extractChannelId(source.read())
writeHeader(target, sz, ch)
source.pipe(sz, target)
}
}
Check the code above. It is quite similar to what we had in A Trivial Java Implementation. The difference
is huge. Every call to suspend
function pauses the execution of a method, so the execution turns to
the write
method implementation of OutputStream
in inverseRead
. We are able now to write
a linear looking code, that is executed in a piece-by-piece manner.
Implementation of the Coroutine
The only missing part is the inverseRead
. We follow the coroutines documentation
to implement that. Great is there are many examples too.
We implement DataReader#read
and DataReader#pipe
functions to suspend execution until there is enough data
send to OutputStream#write
method. We have the following state object for that:
class DataReaderState {
var readByteContinuation = null as Continuation<Int>?
var pipeContinuation = null as Continuation<Unit>?
var pipeTarget = null as OutputStream?
var pipeSizeToWrite = 0
}
Next, we are able to implement the DataReader
interface in the following way:
class DataReaderImpl(val state: DataReaderState) : DataReader {
suspend override fun pipe(sz: Int, to: OutputStream): Unit = suspendCoroutine<Unit> { c ->
state.pipeContinuation = c
state.pipeSizeToWrite = sz
state.pipeTarget = to
}
suspend override fun read(): Int = suspendCoroutine { c ->
state.readByteContinuation = c
}
}
All we have here is a call to the specific suspendCoroutine
function in Kotlin, which does the suspend. I decided
to omit for simplicity a defensive asserts that one may have in both pipe
and read
function implementation.
The OutputStream
implementation in the example is executing next suspended function in a loop. The current
implementation does not check the state at the end of data too. The straightforward implementation is as follows:
class DataReaderOutputStreamImpl(val state: DataReaderState) : OutputStream() {
override fun write(b: Int) = write(byteArrayOf(b.toByte()))
override tailrec fun write(b: ByteArray, off: Int, len: Int) {
if (len <= 0) return
val readOneByte = state.readByteContinuation
if (readOneByte != null) {
readOneByte.resume(b[off].toInt())
return write(b, off+1, len-1)
}
val pipe = state.pipeContinuation
if (pipe != null) {
val toPipe = Math.min(len, state.pipeSizeToWrite)
state.pipeTarget!!.write(b, off, toPipe)
return write(b, off+toPipe, len - toPipe)
}
throw Error("Illegal state")
}
}
At that moment we are ready to implement the inverseRead
function.
fun inverseRead(reader: suspend (DataReader) -> Unit) : OutputStream {
val state = DataReaderState()
val outputStream = DataReaderOutputStreamImpl(state)
val dataReader = DataReaderImpl(state)
val continuation = DataReaderContinuation(state)
reader.createCoroutine(dataReader,continuation).resume(Unit)
return outputStream
}
Coroutine vs State Machine
It turned out we still needed to implement a state machine (via DataReaderState
class). It
was necessary to add a state for each suspend
function of DataReader
interface. That was
the code we need to write as building block (and there
are amazing libraries for most
of the cases)
The good part is that the rest part of the code, where we do the actual data decoding (the readStream
function)
now free from an explicit state machine style programming. One can have code written as easy
as possible. It’s easier to understand and to check. And, for example, we may use loops, try-catches
and any other code constructs we like using for ordinary programs.
In other words, with the help of Kotlin suspend functions we may minimize the number of complex functions and concentrate on the actual development.
For more information on coroutines power in Kotlin, you may refer to GeekOut Talk by Roman Elizarov or to the documentation
comments powered by Disqus