onMessages

fun <Error class: unknown class>.onMessages(queue: SqsQueue, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(10, 250.milliseconds), receiveConfiguration: ReceiveConfiguration.() -> Unit = {}, commitConfiguration: CommitConfiguration.() -> Unit = {}, onError: OnError = OnError.Retry(250.milliseconds), onMessages: suspend (List<<Error class: unknown class>>) -> List<MessageAcknowledgment<Acknowledgment>>): Job

This function is a high-level abstraction that combines receiving messages, processing them, and sending acknowledgments. It is built on top of the receiveMessagesAsFlow and acknowledgmentMessageFlow functions.

Creates a flow that continuously receives messages from an Amazon Simple Queue Service (SQS) queue, processes them using a provided function, and sends acknowledgments for processed messages.

The function is executed in the specified CoroutineScope and returns a Job that represents its execution.

Return

A Job representing the execution of the flow.

Example usage:

val sqsClient: SqsClient = ...

coroutineScope {
val myQueueJob = sqsClient.onMessages(SqsQueue.name("myqueue")) { messages ->
messages.map { MessageAcknowledgment(it, Delete) }
}

//You may cancel myQueueJob at any time.
}

Parameters

queue

The reference of the queue from which messages will be received.

concurrency

The level of concurrency for processing messages. Defaults to 1.

groupStrategy

The strategy to use when chunking messages for processing. Defaults to GroupStrategy.TimeWindow.

receiveConfiguration

A lambda with receiver for configuring the ReceiveConfiguration for the underlying receive operation.

commitConfiguration

A lambda with receiver for configuring the CommitConfiguration for the underlying commit operation.

onError

The strategy to be used when the message processing encounters an error. Defaults to OnError.Retry.

onMessages

A function to process received messages. It receives a list of messages and returns a list of MessageAcknowledgment.