batchUpdate

fun <T> Connection.batchUpdate(sql: String, upstream: Flow<T>, returning: Returning = Returning.Default, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(100, 250.milliseconds), prepare: Statement.(T) -> Unit = {}): Flow<Result>

Executes a batch update with the specified SQL statement and values from a flow of items. This function chunks the items in the flow and executes the chunks concurrently. Optionally, it can return generated values (e.g. auto-generated keys) from the inserted records.

Return

a Flow of Results

Example usage:

val connection: Connection = // Obtain a connection from your R2DBC connection factory

val sql = "INSERT INTO users (id, name) VALUES ($1, $2);"
val users = flowOf(User(1, "John"), User(2, "Jane"), User(3, "Bob"))

val results = connection.batchUpdate(sql, users) { user ->
bind("$1", user.id)
bind("$2", user.name)
}

results.collect { result ->
val count = result.updateCount
val generatedKeys = result.generatedKeys

// Handle results as needed
}

Parameters

sql

The SQL statement to execute

upstream

The flow of items to insert/update

returning

The type of generated values to return (if any)

concurrency

The number of concurrent database connections to use

groupStrategy

The chunking strategy to use when batching updates

prepare

A function to prepare the SQL statement before executing it for a given item from the upstream flow


fun <T> Connection.batchUpdate(upstream: Flow<T>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(100, 250.milliseconds), query: (T) -> String): Flow<Result>

Executes a batch update for a specified upstream flow of items using a given groupStrategy and concurrency. The query function transforms each item in the upstream flow into a respective SQL query string. Returns a flow of long values representing the number of rows updated in the database for each chunk.

Return

a Flow of Results

Example usage:

val connection: Connection = // Obtain a connection from your R2DBC connection factory

val queries = listOf(
"INSERT INTO users (name, age) VALUES ('Alice', 25)",
"INSERT INTO users (name, age) VALUES ('Bob', 30)"
)

val flow = queries.asFlow()

val result = connection.batchUpdate(flow) { it }

result.collect { println("Updated $it rows") } // prints "Updated 1 rows" for each item in the upstream flow

Parameters

upstream

The flow of items to insert/update

concurrency

The number of concurrent database connections to use

groupStrategy

The chunking strategy to use when batching updates

query

the function that transforms each item into a SQL query