indexFlow

fun <T> ElasticsearchAsyncClient.indexFlow(upstream: Flow<Document<T>>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(100, 250.milliseconds)): Flow<BulkResponseItem>

This function is used to index documents asynchronously into Elasticsearch using bulk requests. It takes a flow of Document objects as input, along with optional parameters for concurrency and chunking.

Return

A Flow of BulkResponseItem objects that represent the results of indexing each individual document

Example usage:

val documents = listOf(
Document("index1", "id1", mapOf("field1" to "value1")),
Document("index1", "id2", mapOf("field1" to "value2")),
Document("index2", "id3", mapOf("field1" to "value3"))
).asFlow()

ElasticsearchAsyncClient.indexFlow(documents, concurrency = 3).collect {
when (it) {
is BulkResponseItem.Failure -> println("Indexing failed for item with id ${it.id}: ${it.error}")
is BulkResponseItem.Success -> println("Indexing succeeded for item with id ${it.id}")
}
}

Parameters

upstream

A Flow of Document objects to be indexed asynchronously into Elasticsearch

concurrency

Optional parameter that specifies the number of concurrent bulk requests that can be executed at a time. Default value is 1.

groupStrategy

Optional parameter that specifies the ChunkStrategy to be used for splitting the input stream into chunks. Default value is TimeWindow(100, 250.milliseconds).