acknowledgmentMessageFlow
fun <Error class: unknown class>.acknowledgmentMessageFlow(queue: SqsQueue, upstream: Flow<MessageAcknowledgment<Acknowledgment>>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(10, 250.milliseconds)): Flow<MessageAcknowledgmentResult>
Creates a flow that processes acknowledgments for messages in an Amazon Simple Queue Service (SQS) queue.
This function takes an upstream flow of MessageAcknowledgment objects and processes them based on their acknowledgment type: ChangeMessageVisibility, Delete, or Ignore. The function processes acknowledgments concurrently using concurrency and the specified groupStrategy.
Return
A Flow of MessageAcknowledgmentResult objects that contain the message, acknowledgment, and response.
Example usage:
val sqsClient: SqsClient = ...
val messageAcknowledgments = flowOf(
MessageAcknowledgment(Message(...), Delete),
MessageAcknowledgment(Message(...), ChangeMessageVisibility(30)),
MessageAcknowledgment(Message(...), Ignore)
)
sqsClient
.acknowledgmentMessageFlow(SqsQueue.name("myqueue"), messageAcknowledgments)
.collect { result ->
println(
"""Processed message:
${result.message.messageId()},
acknowledgment: ${result.acknowledgment},
response: ${result.response}"""
)
}
Content copied to clipboard
Parameters
queue
The reference of the queue.
upstream
A Flow of MessageAcknowledgment objects.
concurrency
The level of concurrency for processing messages.
groupStrategy
The strategy to use when chunking messages for processing.