unorderedOnEachAsync

fun <T> Flow<T>.unorderedOnEachAsync(concurrency: Int, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The processing of items in the flow is concurrent and limited by concurrency level.

The result flow may emit the elements in an unordered manner, which makes this function faster than onEachAsync.

Example usage:

flowOf(1, 2, 3)
.unorderedOnEachAsync(2) { println("Processing $it") }
.collect() // Prints "Processing 1", "Processing 2", "Processing 3" in an unordered manner

Return

A new flow with the same elements, side-effecting block applied to each item, possibly unordered.

Parameters

concurrency

The maximum number of concurrent transformations.

block

The function to apply to each item.


fun <T, P> Flow<T>.unorderedOnEachAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

The result flow may emit the elements in an unordered manner, which makes this function faster than onEachAsync.

Example usage:

val customSemaphore: suspend CoroutineScope.() -> AsyncSemaphore =  { AsyncSemaphore(this, 2) }

flowOf(1, 2, 3)
.unorderedOnEachAsync(customSemaphore) { println("Processing $it") }
.collect() // Prints "Processing 1", "Processing 2", "Processing 3" in an unordered manner

Return

A new flow with the same elements, side-effecting block applied to each item, possibly unordered.

Parameters

semaphore

The suspending function that creates an AsyncSemaphore.

block

The function to apply to each item.