singleUpdate

fun Connection.singleUpdate(sql: String): Flow<Long>

Executes a single SQL update statement and returns the number of rows affected as a Flow.

Return

a Flow that emits the number of rows affected by the SQL statement

Example usage:

val connection: Connection = // Obtain a connection from your R2DBC connection factory
val updateSql = "UPDATE users SET email='newemail@example.com' WHERE id=1"
val numAffectedRows = connection.singleUpdate(updateSql).single()
println("Updated $numAffectedRows rows")

Parameters

sql

the SQL statement to execute


fun <T> Connection.singleUpdate(sql: String, upstream: Flow<T>, concurrency: Int = 1, prepare: Statement.(T) -> Unit = {}): Flow<Long>

Executes an SQL update statement for each item in the provided upstream Flow.

Return

A Flow of Long values representing the number of rows updated for each executed statement.

Example usage:

val connection: Connection = // Obtain a connection from your R2DBC connection factory
val sql = "UPDATE users SET name = ? WHERE id = ?"
val users = flowOf(1 to "Alice", 2 to "Bob")

connection
.singleUpdate(sql, users) { (id, name) ->
bind(0, name)
bind(1, id)
}
.collect { rowsUpdated ->
println("Rows updated: $rowsUpdated")
}

Parameters

T

The type of the items emitted by the upstream Flow.

sql

The SQL update statement to be executed.

upstream

A Flow of items to be processed, where each item will be used to prepare an SQL statement.

concurrency

An optional parameter to define the level of concurrency when processing items. Defaults to 1.

prepare

An optional lambda for preparing the statement with an item from the upstream Flow, which allows you to configure the statement further before executing it (e.g., bind parameters). Defaults to an empty lambda.