onEachAsync
Applies the given function block to each value of the original Flow and reemits them downstream. The processing of items in the flow is concurrent and limited by concurrency level.
The result flow emits the elements in an ordered manner.
Example usage:
flowOf(1, 2, 3)
.onEachAsync(2) { println("Processing $it") }
.collect() // Prints "Processing 1", "Processing 2", "Processing 3"
Return
The same flow with the side-effecting block applied to each item.
Parameters
The maximum number of concurrent transformations.
The function to apply to each item.
Applies the given function block to each value of the original Flow and reemits them downstream. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.
The result flow emits the elements in an ordered manner.
Example usage:
val customSemaphore: suspend CoroutineScope.() -> AsyncSemaphore = { AsyncSemaphore(this, 2) }
flowOf(1, 2, 3)
.onEachAsync(customSemaphore) { println("Processing $it") }
.collect() // Prints "Processing 1", "Processing 2", "Processing 3" in an ordered manner
Return
The same flow with the side-effecting block applied to each item.
Parameters
The suspending function that creates an AsyncSemaphore.
The function to apply to each item.