sendMessageFlow

fun <Error class: unknown class>.sendMessageFlow(queue: SqsQueue, upstream: Flow<<Error class: unknown class>>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(10, 250.milliseconds)): Flow<SendMessageResult>

Sends messages to an Amazon Simple Queue Service (SQS) queue efficiently using the provided SqsClient. Messages are collected from an upstream Flow of SendMessageRequest objects and sent in batches for optimal performance.

Return

A Flow of SendMessageResult, representing the outcome of each message sending attempt: * Success: Indicates successful message delivery. * Failure: Indicates a failed send attempt.

Example usage:

val sqsClient = SqsClient {  }
val messages = flowOf(
SendMessageRequest { messageBody = "Message 1" },
SendMessageRequest { messageBody = "Message 2" },
SendMessageRequest { messageBody = "Message 3" }
)

sqsClient
.sendMessageFlow(
queue = SqsQueue.name("hello-queue"),
upstream = messages,
concurrency = 10,
// SQS limits the max number of messages sent within a single request, it cannot be higher than 10
groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(10, 250.milliseconds)
)
.collect { result ->
when (result) {
is Success -> println("Message sent successfully with ID: ${result.success.messageId}")
is Failure -> println("Failed to send message: ${result.error.messageId}")
}
}

Parameters

queue

The Amazon SQS queue where messages will be sent.

upstream

A Flow of SendMessageRequest objects representing messages to be sent to the SQS queue.

concurrency

The maximum number of concurrent send operations. Defaults to 1 for sequential sending.

groupStrategy

Determines how messages are grouped into batches for sending. Defaults to GroupStrategy.TimeWindow, which groups messages within a specified time window with a maximum batch size.