Verwalten von Amazon SQS-Warteschlangen in Java

1. Übersicht

In diesem Tutorial erfahren Sie, wie Sie den SQS (Simple Queue Service) von Amazon mithilfe des Java SDK verwenden .

2. Voraussetzungen

Die Maven-Abhängigkeiten, AWS-Kontoeinstellungen und Clientverbindungen, die zur Verwendung des Amazon AWS SDK für SQS erforderlich sind, sind dieselben wie in diesem Artikel hier.

Angenommen, wir haben eine Instanz von AWSCredentials erstellt, wie im vorherigen Artikel beschrieben, können wir unseren SQS-Client erstellen:

AmazonSQS sqs = AmazonSQSClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withRegion(Regions.US_EAST_1) .build(); 

3. Warteschlangen erstellen

Sobald wir unseren SQS-Client eingerichtet haben, ist das Erstellen von Warteschlangen ziemlich einfach.

3.1. Erstellen einer Standardwarteschlange

Mal sehen, wie wir eine Standardwarteschlange erstellen können. Dazu müssen wir eine Instanz von CreateQueueRequest erstellen:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue"); String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl(); 

3.2. Erstellen einer FIFO-Warteschlange

Das Erstellen eines FIFO ähnelt dem Erstellen einer Standardwarteschlange. Wir werden weiterhin eine Instanz von CreateQueueRequest verwenden , wie wir es zuvor getan haben. Nur dieses Mal müssen wir Warteschlangenattribute übergeben und das FifoQueue- Attribut auf true setzen :

Map queueAttributes = new HashMap(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest( "baeldung-queue.fifo").withAttributes(queueAttributes); String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest) .getQueueUrl(); 

4. Nachrichten in Warteschlangen stellen

Sobald wir unsere Warteschlangen eingerichtet haben, können wir mit dem Senden von Nachrichten beginnen.

4.1. Senden einer Nachricht an eine Standardwarteschlange

Um Nachrichten an eine Standardwarteschlange zu senden, müssen wir eine Instanz von SendMessageRequest erstellen.

Dann fügen wir dieser Anfrage eine Karte mit Nachrichtenattributen hinzu:

Map messageAttributes = new HashMap(); messageAttributes.put("AttributeOne", new MessageAttributeValue() .withStringValue("This is an attribute") .withDataType("String")); SendMessageRequest sendMessageStandardQueue = new SendMessageRequest() .withQueueUrl(standardQueueUrl) .withMessageBody("A simple message.") .withDelaySeconds(30) .withMessageAttributes(messageAttributes); sqs.sendMessage(sendMessageStandardQueue); 

Der withDelaySeconds () gibt an, nach wie lange die Nachricht in der Warteschlange ankommen sollte.

4.2. Senden einer Nachricht an eine FIFO-Warteschlange

Der einzige Unterschied besteht in diesem Fall darin, dass wir die Gruppe angeben müssen , zu der die Nachricht gehört:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest() .withQueueUrl(fifoQueueUrl) .withMessageBody("Another simple message.") .withMessageGroupId("baeldung-group-1") .withMessageAttributes(messageAttributes);

Wie Sie im obigen Codebeispiel sehen können, geben wir die Gruppe mithilfe von withMessageGroupId () an.

4.3. Mehrere Nachrichten in eine Warteschlange stellen

Wir können auch mehrere Nachrichten mit einer einzigen Anfrage in eine Warteschlange stellen. Wir erstellen eine Liste von SendMessageBatchRequestEntry, die wir mit einer Instanz von SendMessageBatchRequest senden :

List  messageEntries = new ArrayList(); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-1") .withMessageBody("batch-1") .withMessageGroupId("baeldung-group-1")); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-2") .withMessageBody("batch-2") .withMessageGroupId("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(fifoQueueUrl, messageEntries); sqs.sendMessageBatch(sendMessageBatchRequest);

5. Nachrichten aus Warteschlangen lesen

Wir können Nachrichten aus unseren Warteschlangen empfangen, indem wir die Methode receiveMessage () für eine Instanz von ReceiveMessageRequest aufrufen:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl) .withWaitTimeSeconds(10) .withMaxNumberOfMessages(10); List sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages(); 

Mit withMaxNumberOfMessages () geben wir an, wie viele Nachrichten aus der Warteschlange abgerufen werden sollen - obwohl zu beachten ist, dass das Maximum 10 beträgt .

Die Methode withWaitTimeSeconds () ermöglicht Langzeitabfragen. Lange Abfragen sind eine Möglichkeit, die Anzahl der an SQS gesendeten Empfangsnachrichtenanforderungen zu begrenzen.

Einfach ausgedrückt bedeutet dies, dass wir bis zur angegebenen Anzahl von Sekunden warten, um eine Nachricht abzurufen. Wenn sich für diese Dauer keine Nachrichten in der Warteschlange befinden, wird die Anforderung leer zurückgegeben. Wenn während dieser Zeit eine Nachricht in der Warteschlange eintrifft, wird sie zurückgegeben.

Wir können die Attribute und den Text einer bestimmten Nachricht erhalten:

sqsMessages.get(0).getAttributes(); sqsMessages.get(0).getBody();

6. Löschen einer Nachricht aus einer Warteschlange

Um eine Nachricht zu löschen, verwenden wir eine DeleteMessageRequest :

sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(fifoQueueUrl) .withReceiptHandle(sqsMessages.get(0).getReceiptHandle())); 

7. Warteschlangen für tote Briefe

Eine Warteschlange für tote Buchstaben muss vom gleichen Typ sein wie ihre Basiswarteschlange - sie muss FIFO sein, wenn die Basiswarteschlange FIFO ist, und Standard, wenn die Basiswarteschlange Standard ist. In diesem Beispiel verwenden wir eine Standardwarteschlange.

Das erste, was wir tun müssen, ist, eine Warteschlange für tote Briefe zu erstellen:

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl(); 

Als Nächstes erhalten wir den ARN (Amazon Resource Name) unserer neu erstellten Warteschlange:

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes( new GetQueueAttributesRequest(deadLetterQueueUrl) .withAttributeNames("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes() .get("QueueArn"); 

Schließlich setzen wir diese neu erstellte Warteschlange als Dead Letter-Warteschlange unserer ursprünglichen Standardwarteschlange:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest() .withQueueUrl(standardQueueUrl) .addAttributesEntry("RedrivePolicy", "{\"maxReceiveCount\":\"2\", " + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}"); sqs.setQueueAttributes(queueAttributesRequest); 

The JSON packet we set in the addAttributesEntry() method when building our SetQueueAttributesRequest instance contains the information we need: the maxReceiveCount is 2, which means that if a message is received this many times, it's assumed to haven't been processed correctly, and is sent to our dead letter queue.

The deadLetterTargetArn attribute points our standard queue to our newly created dead letter queue.

8. Monitoring

We can check how many messages are currently in a given queue, and how many are in flight with the SDK. First, we'll need to create a GetQueueAttributesRequest.

From there we'll check the state of the queue:

GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(standardQueueUrl) .withAttributeNames("All"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributes(getQueueAttributesRequest); System.out.println(String.format("The number of messages on the queue: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessages"))); System.out.println(String.format("The number of messages in flight: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessagesNotVisible")));

Mit Amazon Cloud Watch kann eine eingehendere Überwachung erreicht werden.

9. Fazit

In diesem Artikel haben wir gesehen, wie SQS-Warteschlangen mit dem AWS Java SDK verwaltet werden.

Wie üblich finden Sie alle im Artikel verwendeten Codebeispiele auf GitHub.