Esempi per Amazon MSK con SDK per Rust - AWS SDK per Rust

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi per Amazon MSK con SDK per Rust

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando l' AWS SDK per Rust con Amazon MSK.

Ogni esempio include un link al codice sorgente completo, in cui vengono fornite le istruzioni su come configurare ed eseguire il codice nel contesto.

Esempi serverless

L’esempio di codice seguente mostra come implementare una funzione Lambda che riceve un evento attivato dalla ricezione di record da un cluster Amazon MSK. La funzione recupera il payload MSK e registra il contenuto del record.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di Esempi serverless.

Utilizzo di un evento Amazon MSK con Lambda tramite Rust.

use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }