batchUpdate
Executes a batch update with the specified SQL statement and values from a flow of items. This function chunks the items in the flow and executes the chunks concurrently. Optionally, it can return generated values (e.g. auto-generated keys) from the inserted records.
Return
a Flow of Results
Example usage:
val connection: Connection = // Obtain a connection from your R2DBC connection factory
val sql = "INSERT INTO users (id, name) VALUES ($1, $2);"
val users = flowOf(User(1, "John"), User(2, "Jane"), User(3, "Bob"))
val results = connection.batchUpdate(sql, users) { user ->
bind("$1", user.id)
bind("$2", user.name)
}
results.collect { result ->
val count = result.updateCount
val generatedKeys = result.generatedKeys
// Handle results as needed
}
Parameters
The SQL statement to execute
The flow of items to insert/update
The type of generated values to return (if any)
The number of concurrent database connections to use
The chunking strategy to use when batching updates
A function to prepare the SQL statement before executing it for a given item from the upstream flow
Executes a batch update for a specified upstream flow of items using a given groupStrategy and concurrency. The query function transforms each item in the upstream flow into a respective SQL query string. Returns a flow of long values representing the number of rows updated in the database for each chunk.
Return
a Flow of Results
Example usage:
val connection: Connection = // Obtain a connection from your R2DBC connection factory
val queries = listOf(
"INSERT INTO users (name, age) VALUES ('Alice', 25)",
"INSERT INTO users (name, age) VALUES ('Bob', 30)"
)
val flow = queries.asFlow()
val result = connection.batchUpdate(flow) { it }
result.collect { println("Updated $it rows") } // prints "Updated 1 rows" for each item in the upstream flow
Parameters
The flow of items to insert/update
The number of concurrent database connections to use
The chunking strategy to use when batching updates
the function that transforms each item into a SQL query