Amazon Simple Queue Service (SQS) Γ¨ un servizio di messaggistica completamente gestito che consente la decoupling tra componenti di unβapplicazione, Γ¨ la versione cloud dellβimplementazione di una coda producer consumer.
Il servizio Γ¨ sempre gratuito fino a 1 milione di richieste.
Cosa Γ¨ una Queue
Quando unβAPI viene chiamata, la sua prima operazione, sincrona, Γ¨ tipicamente la scrittura nel database. Successivamente, puΓ² svolgere altre attivitΓ asincrone, come inviare email o effettuare chiamate ad altre API. Per disaccoppiare queste operazioni asincrone dalle responsabilitΓ dellβAPI principale, si implementa una queue. Dopo aver scritto nel database, lβAPI aggiunge un messaggio alla coda, che Γ¨ un semplice file JSON di testo. Un servizio consumer separato si occupa di elaborare i messaggi dalla coda, eseguendo le operazioni richieste in modo indipendente e completamente asincrono. Questo consumer gestisce fallimenti, retry e altre eventuali problematiche, sollevando lβapplicazione principale da tali responsabilitΓ .
Configurazioni
- Standard vs FIFO: Le code Standard offrono throughput illimitato e un ordine approssimativo dei messaggi, mentre le FIFO garantiscono lβordine e la consegna unica a scapito di un limite nel numero di richieste per secondo e una minore scalabilitΓ . La soluzione migliore Γ¨ sempre Standard in quanto eventuali richieste multiple possono essere gestite lato applicazione consumer.
- Visibility Timeout: Periodo durante il quale un messaggio estratto dalla coda Γ¨ nascosto agli altri consumer per evitare elaborazioni simultanee. Consente di completare lβelaborazione in modo sicuro.
- Message Retention Period: Tempo massimo per cui un messaggio rimane nella coda prima di essere eliminato automaticamente se non viene elaborato.
- Delivery Delay: Ritardo configurabile tra lβinserimento di un messaggio nella coda e la sua disponibilitΓ per il consumo. Utile per posticipare lβelaborazione.
- Maximum Message Size: Dimensione massima in byte di un messaggio. Garantisce performance e stabilitΓ limitando i dati trasferibili in un singolo messaggio.
- Receive Message Wait Time: Periodo massimo in cui un consumer aspetta per ricevere un messaggio quando la coda Γ¨ vuota. Riduce chiamate inutili grazie alla logica di long polling.
Dead Letter Queue
PuΓ² capitare che lβelaborazione di un messaggio dalla coda non vada a buon fine: questa cosa non deve bloccare gli altri messaggi della coda che devono essere consumati. Inoltre non devo in loop continuamente riprovare ad elaborare il messaggio (che non sarΓ stato rimosso dalla coda principale in quanto non elaborato) in quanto probabilmente continuerΓ a fallire bloccando tutti gli altri.
La soluzione Γ¨ implementare il pattern Dead Letter Queue
.
Lβidea Γ¨ provare ad elaborare un messaggio un numero finito di volte, esempio 3. Quindi per tre volte il messaggio non viene rimosso dalla coda principale in quanto magari Γ¨ un problema temporaneo e ci sta riprovarci.
Dopo lβennesimo fallimento lo elimino dalla coda principale e lo metto in unβaltra coda ad hoc, la Dead Letter Queue
.
Questa coda verrà gestita manualmente dagli sviluppatori che riceveranno degli alert automatici quando un nuovo messaggio entra in tale coda e verificheranno il da farsi, scoprendo tipicamente bug e così via.
AWS
Su AWS basta creare una nuova queue, convenzionalmente con il nome della coda di riferimento con un append di -dlq
, indicando il massimo valore nel Message retention period
in quanto, dato che i messaggi verranno controllati a mano, vogliamo che ci stiano il piΓΉ a lungo possibile.
Una volta creata andare sulla coda originale (nel mio esempio customers
) e, nelle opzioni, abilitare βdead letter queueβ indicando la coda appena creata come coda da inviare i messaggi che vengono ricevuti ma non eliminati Maximum receives
volte.
Quindi se un messaggio raggiunge tale numero come receive count
viene inviato alla coda automaticamente.
Inoltre nella coda principale devo modificarla andando nella sezione Redrive allow policy -> By queue
indicando la dlq
.
Una volta fatto comparirΓ un pulsante nellβinterfaccia principale della coda chiamato Start DLQ redrive
che permette di rimettere tutti i messaggi nella dlq
nella coda principale; questo tipicamente avviene quando nellβapplicazione Γ¨ stato sistemato un bug e conseguentemente voglio riprocessare tutti i messaggi.
Notare che ci sono due opzioni sul numero di messaggi da mandare alla coda principale, il primo, System optimized
li manda alla massima velocitΓ possibile, il secondo Custom max velocity
permette di inviarli fino ad un massimo di x
al secondo in modo da non imballare il sistema se riceve troppi messaggi da processare contemporaneamente.
Best practice
Consumer con Lambda
Uno dei modi piΓΉ puliti per gestire un evento con SQS Γ¨ usare una Lambda: invece di deployare un server che bisogna gestire e eventualmente scalare in base alla richieste posso pensare di gestire tutto tramite lambda che Γ¨ un servizio serverless che si autoscala in base alla richieste e praticamente gratuito.
Contract publisher consumer
Dato che tra il publisher e il consumer di una queue cβΓ¨ un βcontractβ che indica la struttura dellβoggetto che viene scambiato non voglio passare direttamente customer in quanto qualora il dominio venisse cambiato dovrei modificare anche il contratto con il consumer, cosa che non voglio essere obbligato a fare.
Eβ una best practice quindi fornire non lβoggetto del dominio (esempio Customer
) ma una sua versione stile CustomerCreated
, cosa che viene fatta da una classe mapper come nellβesempio sotto.
public static CustomerCreated ToCustomerCreatedMessage(this Customer customer)
{
return new CustomerCreated
{
Id = customer.Id,
Email = customer.Email,
GitHubUsername = customer.GitHubUsername,
FullName = customer.FullName,
DateOfBirth = customer.DateOfBirth
};
}
Consumer con ASP.NET Core
Per implementare un consumer in Asp.NET Core Γ¨ buona norma utilizzare la classe BackgroundService
dato che permette di far girare servizi in background in modo compatibile con tutto lβecosistema ASP.NET.
La classe in questione poi viene aggiunta usando il metodo AddHosterService
, per esempio builder.Services.AddHostedService<QueueConsumerService>();
Esempio
Il pacchetto nuget Amazon.SQS
fornisce dei metodi comodi per interagire con la coda.
Publisher
public record CustomerCreated(Guid Id, string FullName, string Email, string GitHubUsername, DateTime DateOfBirth);
var sqsClient = new AmazonSQSClient();
var customer = new CustomerCreated
{
Id = Guid.NewGuid(),
Email = "nick@nickchapsas.com",
FullName = "Nick Chapsas",
DateOfBirth = new DateTime(1993, 1, 1),
GitHubUsername = "nickchapsas"
};
var queueUrlResponse = await sqsClient.GetQueueUrlAsync("customers");
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = queueUrlResponse.QueueUrl,
MessageBody = JsonSerializer.Serialize(customer),
MessageAttributes = new Dictionary<string, MessageAttributeValue>
{
{
"MessageType", new MessageAttributeValue
{
DataType = "String",
StringValue = nameof(CustomerCreated)
}
}
},
};
var response = await sqsClient.SendMessageAsync(sendMessageRequest);
Consumer
var cts = new CancellationTokenSource();
var sqsClient = new AmazonSQSClient();
//A partire dal nome della coda fornisce l'URL corretto a cui mandare le richieste
var queueUrlResponse = await sqsClient.GetQueueUrlAsync("customers");
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = queueUrlResponse.QueueUrl,
AttributeNames = new List<string>{ "All" },
MessageAttributeNames = new List<string>{ "All" }
};
while (!cts.IsCancellationRequested)
{
var response = await sqsClient.ReceiveMessageAsync(receiveMessageRequest, cts.Token);
foreach (var message in response.Messages)
{ Console.WriteLine($"Message Id: {message.MessageId}");
Console.WriteLine($"Message Body: {message.Body}");
// Consumare un messaggio non porta alla sua eliminazione, devo comunicarlo esplicitamente
await sqsClient.DeleteMessageAsync(queueUrlResponse.QueueUrl, message.ReceiptHandle);
} await Task.Delay(3000);
}