collectAsync

suspend fun <T> Flow<T>.collectAsync(concurrency: Int, block: suspend (T) -> Unit)

Collects all the values emitted by the Flow, applying the block function to each value. The processing of items in the flow is concurrent and limited by concurrency level.

Example usage:

flowOf(1, 2, 3)
.collectAsync(2) { println("Processing $it") }

Parameters

concurrency

The maximum number of concurrent transformations.

block

The function to apply to each item.


suspend fun <T, P> Flow<T>.collectAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, block: suspend (T) -> Unit)

Collects all the values emitted by the Flow, applying the block function to each value. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Example usage:

val customSemaphore: suspend CoroutineScope.() -> AsyncSemaphore =  { AsyncSemaphore(this, 2) }

flowOf(1, 2, 3)
.collectAsync(customSemaphore) { println("Processing $it") }

Parameters

semaphore

The suspending function that creates an AsyncSemaphore.

block

The transformation function to apply to each item.