Stories of development: setting up AWS SQS to Lambda in rust

Issues with input type + return batch failures

Edoardo Barp

--

One of the great things about using serverless cloud is how you can easily connect various bits together. A few simple permissions and configurations in your Cloudformation YAML and voila. One way to do so is to connect SQS to a lambda, so the lambda is directly called by the queue and receives one or more items. Unfortunately, it also comes with one very big drawback: you can’t perfectly replicate the environment locally, meaning that sometimes, you just have to deploy, re-deploy, and, once more for the audience, deploy, just to be able to debug.

Input type error

Recently, I had to deploy a piece of software with this logic. All was going great until I encoutered two issues. One with the input: the aws_lambda_events includes a generic type SqsEventObject<T>, where T would be your given type. This parses the entire requests (which includes a lot of metadata, such as the region, message attributes, time, etc.) as well as your custom struct, which would need to have serde::Deserialize implemented. In theory, your function should look like this:

#[derive(Deserialize, Debug)]
struct MyStruct {
string: String
}

async fn function_handler(mut event: LambdaEvent<SqsEventObj<MyStruct>>) -> Result<BatchItemFailures, Error> {
// you can directly use your MyStruct in the code through event.records[0].payload
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with( tracing_subscriber::filter::EnvFilter::from_default_env())
.init();
run(service_fn(function_handler)).await
}

However.. this doesn’t work. I spent an hour trying to understand why, but couldn’t. Instead I decided to fall back to using SqsEvent which is basically the same type except the body is not parsed. I am still not convinced why the first method doesn’t work, but the second is simple enough to work with, so might as well just move on.

And then came the second issue: returning the failed items.

Batch input: how to specify the errors?

You see, this lambda can receive batches of multiple records. What happens if only one of the records fail? You don’t want to re-run them all, especially since you may not be doing idempotent modifications, and therefore processing twice the same input would not be a good idea.

So, AWS allows you to return failed items through this struct:

pub struct SqsBatchResponse {
pub batch_item_failures: Vec<BatchItemFailure>,
}
pub struct BatchItemFailure {
pub item_identifier: String,
}

But of course, it’s not so simple. It turns out you need to return the struct as a serde_json::Value , in addition to specifying that you will return that type in your cloudformation! Thank you very much to this blog for helping me after hours of absolute time-consuming torture: https://dfrasca.hashnode.dev/rust-for-javascript-developers-sqs-batch-error-handling-with-aws-lambda

The cloudformation description:

PutVisit:
Type: AWS::Serverless::Function
Properties:
Events:
SQSPutVisit:
Type: SQS
Properties:
Queue: !GetAtt SQSQueue.Arn
BatchSize: 10
FunctionResponseTypes:
- ReportBatchItemFailures

And the return required

async fn function_handler(mut event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
let mut failures = SqsBatchResponse::default();
for record in event.records {
// parsing the body
let t: Data = serde_json::from_str(record.body.as_ref().unwrap().as_str()).unwrap();
// telling AWS that this record failed
failures.batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap() });
}
// returning failures
Ok(serde_json::to_value(failures).unwrap())
}

All works as expected now!

--

--

Edoardo Barp

Physicist with an engineering mind — love to write about my experiences — Creator of https://localassistant.ai: openrouter.ai chat interface