consume

@ExperimentalCoroutinesApi
fun ConnectionFactory.consume(queueName: String, credentials: Credentials? = null, sessionMode: SessionMode = SessionMode.CLIENT_ACKNOWLEDGE, pollingMaxWait: Duration = 10.seconds, concurrency: Int = 1): Flow<CommittableMessage>

Consumes messages from a specified JMS queue using a reactive Flow API.

Return

A flow of CommittableMessage objects, which can be acknowledged after processing.

Example usage:

val connectionFactory = // Obtain a JMS connection factory
val queueName = "example-queue"

connectionFactory.consume(queueName).collect { message ->
println("Received message: ${message.message}")

// Process the message here

// Acknowledge the message after processing using coAcknowledge
// Don't call acknowledge directly since it may perform blocking I/O
message.coAcknowledge()
}

Parameters

queueName

The name of the queue to consume messages from.

credentials

Optional credentials to use for establishing the connection. Defaults to null.

sessionMode

The session mode for the JMS context. Defaults to SessionMode.CLIENT_ACKNOWLEDGE.

pollingMaxWait

The maximum duration to wait for a message during polling. Defaults to 10 seconds.

concurrency

The number of concurrent consumers for message consumption. Defaults to 1.