parseJsonLines

inline fun <T> Flow<ByteArray>.parseJsonLines(objectMapper: ObjectMapper = defaultObjectMapper): Flow<T>

Converts a Flow of byte arrays into a Flow of parsed JSON objects of a specified type.

The function buffers incoming byte arrays, treating them as parts of a JSON Lines formatted string, and processes each JSON object line as soon as it's fully received.

This approach is especially useful when consuming data from a slow upstream, such as a network call, as it allows data processing to begin as soon as enough has been received rather than waiting for the entire data set.

By not loading all the data into memory at once, it works in a memory-efficient manner.

Additionally, it leverages the built-in backpressure handling of Kotlin's Flow API to prevent the producer from overwhelming the consumer.

Return

A Flow of parsed objects of type T.

Example usage:

val flow: Flow<ByteArray> = flowOf(
"{ \"name\": \"John",
" Doe\" }\n",
"{ \"name\": \"Jane Doe\" }\n"
).asByteArray()

val parsedFlow: Flow<Map<String, String>> = flow.parseJsonLines()

parsedFlow.collect { println(it) }

Parameters

T

The type into which JSON objects should be parsed. This is identified at runtime using reified type parameters.

objectMapper

The ObjectMapper to use for JSON parsing. If not specified, a default ObjectMapper is used.