SQS To DynamoDB Tuning - Writing Data to DDB in Rust and Typescript

SQS To DynamoDB Tuning - Writing Data to DDB in Rust and Typescript

Last time we created our Rust and Typescript Lambdas with basic hello world implementations and did a quick performance comparison. We'll now expand our Rust and Typescript Lambdas from last time into ones that take data from SQS messages and push the data into DynamoDB. While we're at it, we'll compare the performance of Rust and Typescript versions and see which is more up to the task.

From last time, here is our template.yml with our two Lambdas:

AWSTemplateFormatVersion: "2010-09-09"

Transform:
- "AWS::Serverless-2016-10-31"

Resources:

  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: index.handler
      Runtime: nodejs14.x
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: none
      Runtime: provided.al2
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
    Metadata:
      BuildMethod: makefile

Let's add a DynamoDB table to template.yml:

...

  DynamoDBTable:
    Type: AWS::DynamoDB::Table
      Properties:
        AttributeDefinitions: 
          - AttributeName: id
            AttributeType: S
        KeySchema: 
          - AttributeName: id
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST

This creates our table with 'id' as the hash key for the primary index, with a type of string. This tells DynamoDB that our items will have an 'id' field that will be used to reference them. Each item will contain several other fields, but we don't need to specify them in AttributeDefinitions because they will not be used as key fields. If we were to add secondary indexes, we would need to add the key fields for those secondary indices there.

We're also setting the BillingMode to PAY_PER_REQUEST because our workloads will be spikey and inconsistent and we don't want to have to pay for throughput we don't use. If we had steady, predictable workloads, we would want to use PROVISIONED.

Now we'll add SQS Queues for each Lambda to receive message from:

...
  TSQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 180

  RustQueue:
    Type: AWS::SQS::Queue
    Properties:  
      VisibilityTimeout: 180

We are setting the VisibilityTimeout here to 180 seconds because our Lambdas are set to 30 seconds timeout, given the guidance here that the visibility timeout should be at least six times the Lambda timeout, so we'll start there and likely tune it later.

We now need to tie everything together. We'll add SQS event source configurations to the Lambdas:

...
  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: index.handler
      Runtime: nodejs14.x
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt TSQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: none
      Runtime: provided.al2
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RustQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile
...

This configures our Lambdas to automatically trigger when messages are received on their respective queues. Since we are configuring the BatchSize as 1, each time the Lambdas are invoked we will receive an event with 1 message inside of it. Now we need to give Lambda permission to access our DynamodDB. Following the security best practice of the principle of least priviledge (POLP), we just give our lambdas write access to our specific table. AWS SAM provides some nice policy templates, which allow for much less verbose permissions than creating full policies:

...
  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: index.handler
      Runtime: nodejs14.x
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt TSQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: none
      Runtime: provided.al2
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RustQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile
...

Note that we have already AWSLambdaBasicExecutionRole configured. This gives our Lambdas permission to upload logs to CloudWatch so we can view our logs there.

We'll also need to provide our Lambdas a way to access the DDB table, a convenient way to do this is with environment variables:

...
  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: index.handler
      Runtime: nodejs14.x
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Environment:
        Variables: 
          TABLE_NAME: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt TSQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Architectures:
        - arm64
      Handler: none
      Runtime: provided.al2
      CodeUri: .
      Timeout: 30
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Environment:
        Variables: 
          TABLE_NAME: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RustQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile
...

Now our Lambda configurations are starting to get large. There's a lot of duplication there, fortunately we can use the Globals section to refactor a lot of the common configurations to a single place:

...
  Function:
    Architectures:
      - arm64
    CodeUri: .
    Timeout: 30
    MemorySize: 512
    Environment:
      Variables: 
        TABLE_NAME: !Ref DynamoDBTable

Resources: 

  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.handler
      Runtime: nodejs14.x
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt TSQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Handler: none
      Runtime: provided.al2
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RustQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile
...

To make it easy to test with the AWS CLI without having to dig around in the AWS console, I'll add Outputs for the queue URLs and the table name:

...
Outputs:

  TSQueueUrl:
    Value: !GetAtt TSQueue.QueueUrl

  RustQueueUrl:
    Value: !GetAtt RustQueue.QueueUrl

  TableName:
    Value: !Ref DynamoDBTable

Our full template now looks like this:

AWSTemplateFormatVersion: "2010-09-09"

Transform:
- "AWS::Serverless-2016-10-31"

Globals:

  Function:
    Architectures:
      - arm64
    CodeUri: .
    Timeout: 30
    MemorySize: 512
    Environment:
      Variables: 
        TABLE_NAME: !Ref DynamoDBTable

Resources: 

  BlasterLambdaTS:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.blaster.handler
      Runtime: nodejs14.x
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt TSQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  BlasterLambdaRust:
    Type: AWS::Serverless::Function
    Properties:
      Handler: none
      Runtime: provided.al2
      Policies:
        - AWSLambdaBasicExecutionRole
        - DynamoDBWritePolicy:
            TableName: !Ref DynamoDBTable
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RustQueue.Arn
            BatchSize: 1 
    Metadata:
      BuildMethod: makefile

  DynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions: 
        - AttributeName: id
          AttributeType: S
      KeySchema: 
        - AttributeName: id
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST

  TSQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 180

  RustQueue:
    Type: AWS::SQS::Queue
    Properties:  
      VisibilityTimeout: 180

Outputs:

  TSQueueUrl:
    Value: !Ref TSQueue

  RustQueueUrl:
    Value: !Ref RustQueue

  TableName:
    Value: !Ref DynamoDBTable

Now we can test deployment:

>> sam build
>> sam deploy --stack-name sqs-to-ddb --s3-bucket larrys-cool-bucket --capabilities CAPABILITY_IAM

That succeeded so now we can try sending messages to each queue:

>> aws sqs send-message --queue-url <<rust-queue-url>> --message-body "Hello Q"
>> aws sqs send-message --queue-url <<ts-queue-url>> --message-body "Hello Q"

Logging into the AWS Lambda Console, finding the TS Lambda, clicking "View Logs in CloudWatch" in the "Monitor" tab, and viewing the latest log stream we see the event logged:

2022-01-15T17:11:52.493Z    79da49e5-701f-5348-900b-13b872940d7a    INFO    Hello Event: 
{
    "Records": [
        {
            "messageId": "c0110c84-0130-44c2-8199-78f8912896a1",
            "receiptHandle": ...,
            "body": "Hello Q",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1642266711948",
                "SenderId": ...,
                "ApproximateFirstReceiveTimestamp": "1642266711949"
            },
            "messageAttributes": {},
            "md5OfBody": "50eba39d724e8bd654ade06019dbd7fc",
            "eventSource": "aws:sqs",
            "eventSourceARN": "...",
            "awsRegion": "us-east-1"
        }
    ]
}

We'll see a similar log for the Rust Lambda, so our SQS event source configurations appear to be working. Now we need to add code to push data into our DynamoDB table. We'll start out with just parsing some JSON from the message body of the SQS event and pushing it into our table.

We need to add the DynamoDB SDKs to our Lambda handlers and add code to push the data into our table.

Rust

Adding the SDK to our Cargo.toml:

[package]
name = "sqs_to_ddb"
version = "0.1.0"
edition = "2021"

[dependencies]
lambda_runtime = "0.4.1"
tokio = { version = "1.0", features = ["full"] }
serde_json = "^1"
aws-config = "0.4.0"
aws-sdk-dynamodb = "0.4.0"

Now we add a representation of our data and add code to read items from the SQS records and push them into our table (src/bin/blaster_handler.rs):

use std::env;
use lambda_runtime::{handler_fn, Context, Error as LambdaError};
use serde::{Deserialize};
use aws_lambda_events::event::sqs::SqsEvent;
use aws_sdk_dynamodb::{Client};
use aws_sdk_dynamodb::model::{AttributeValue};

#[derive(Deserialize)]
struct Data {
    id: String,
    a: f64,
    b: f64,
    c: f64,
    d: f64
}

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    let func = handler_fn(func);
    lambda_runtime::run(func).await?;
    Ok(())
}

async fn func(event: SqsEvent, _: Context) -> Result<(), LambdaError> {
    let items: Vec<Data> = event.records.iter()
        .map(|record| serde_json::from_str::<Data>(&record.body.as_ref().unwrap()).unwrap())
        .collect();

    let shared_config = aws_config::load_from_env().await;
    let client = Client::new(&shared_config);
    let table_name = &env::var("TABLE_NAME")?;
    for item in items {
        client.put_item()
            .table_name(table_name)
            .item("id", AttributeValue::S(item.id))
            .item("a", AttributeValue::N(item.a.to_string()))
            .item("b", AttributeValue::N(item.b.to_string()))
            .item("c", AttributeValue::N(item.c.to_string()))
            .item("d", AttributeValue::N(item.d.to_string()))
            .send().await.unwrap();
    }
    Ok(())
}

Typescript

Adding the DynamoDB SDK V3:

>> npm install @aws-sdk/client-dynamodb

And adding code corresponding to our Rust Lambda:

import { SQSEvent, SQSHandler } from "aws-lambda";
import { DynamoDBClient, PutItemCommand } from "@aws-sdk/client-dynamodb";

type Data = {
    id: string,
    a: number,
    b: number,
    c: number,
    d: number
}

function parseData(json: string): Data {
    return JSON.parse(json);
}

export const handler: SQSHandler = async (event: SQSEvent) => {
    const client = new DynamoDBClient({});

    const items = event.Records.map(record => record.body).map(parseData);

    const tableName = process.env.TABLE_NAME;
    for (const item of items) {
        await client.send(new PutItemCommand({
            TableName: tableName,
            Item: {
                id: {S: item.id},
                a: {N: item.a.toString()},
                b: {N: item.b.toString()},
                c: {N: item.c.toString()},
                d: {N: item.d.toString()}
            }
        }));
    }
}

The Rust version seemed a lot more verbose than the Typescript version, it might be interesting to compare the file sizes:

>> wc -c src/bin/blaster_handler.rs 
    1305 src/bin/blaster_handler.rs
>> wc -c src/ts/index.ts 
     762 src/ts/index.ts

So the Rust version is about 70% larger. Granted, the difference in verbosity could be a symptom of the particular DDB SDKs used, rather than the expressiveness of the languages themselves, but my experience with each suggests the difference is fairly typical.

Let's build and compare the build times.

>> sam build --debug
...
2022-01-15 14:56:07,449 | executing Make: ['make', '--makefile', '/Users/larry/Documents/code/sqs_to_ddb/Makefile', 'build-BlasterLambdaTS']
2022-01-15 14:56:12,201 | CustomMakeBuilder:MakeBuild succeeded
...
2022-01-15 14:56:18,032 | executing Make: ['make', '--makefile', '/Users/larry/Documents/code/sqs_to_ddb/Makefile', 'build-BlasterLambdaRust']
2022-01-15 14:56:49,067 | CustomMakeBuilder:MakeBuild succeeded

So it looks like the Typescript build only took about 5 seconds with TS and Webpack compilation, while the Rust build took about 30 seconds, roughly 6X longer build time. Let's deploy and see if Rust's additional verbosity and much longer build time are worth it:

>> sam deploy --stack-name sqs-to-ddb --s3-bucket larrys-cool-bucket --capabilities CAPABILITY_IAM

Now clicking the "Test" in the Lambda console and testing the Typescript Lambda with the following event:

{
    "Records": [
        {
            "body": "{\"id\":\"ts-1\",\"a\":1.2,\"b\":2.3,\"c\":3.4,\"d\":4.5}"
        }
    ]
}

And the Rust Lambda with this event:

{
    "Records": [
        {
            "body": "{\"id\":\"rust-1\",\"a\":1.2,\"b\":2.3,\"c\":3.4,\"d\":4.5}"
        }
    ]
}

The durations look like this:

Rust:

REPORT RequestId: 409a4379-c316-4543-ba82-f01c79e81d8b    Duration: 122.97 ms    Billed Duration: 158 ms    Memory Size: 512 MB    Max Memory Used: 23 MB    Init Duration: 34.91 ms

Typescript:

REPORT RequestId: 57ee4cd3-129c-4ff1-b4ba-a8ee5967317b    Duration: 162.53 ms    Billed Duration: 163 ms    Memory Size: 512 MB    Max Memory Used: 62 MB    Init Duration: 240.25 ms

So the Typescript version took about 7X longer for initialization, after that it took about 33% longer, and used nearly 3X as much memory. Let's check our table and make sure the data made it in:

>> aws dynamodb scan --table-name <<table name>>
{
    "Items": [
        {
            "a": {
                "N": "1.2"
            },
            "b": {
                "N": "2.3"
            },
            "c": {
                "N": "3.4"
            },
            "d": {
                "N": "4.5"
            },
            "id": {
                "S": "rust-1"
            }
        },
        {
            "a": {
                "N": "1.2"
            },
            "b": {
                "N": "2.3"
            },
            "c": {
                "N": "3.4"
            },
            "d": {
                "N": "4.5"
            },
            "id": {
                "S": "ts-1"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

Looks good. Now let's blast a bunch of messages into the queue and see what the average durations are. First, we'll move the model to src/bin/model/mod.rs:

use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct Data {
    pub id: String,
    pub a: f64,
    pub b: f64,
    pub c: f64,
    pub d: f64
}

Note that the reason we needed src/bin/model/mod.rs instead of src/bin/model.rs is that Cargo will try to compile any .rs file at the top level of bin to a binary, which we don't want. Now create src/bin/sqs_blaster.rs:

use aws_sdk_sqs::{Client};

mod model;

#[tokio::main]
async fn main() {
    let shared_config = aws_config::load_from_env().await;
    let client = Client::new(&shared_config);

    let queue_urls = ["<<ts-queue-url>>", "<<rust-queue-url>"];
    for i in 1..1000 {
        let data = model::Data { id: format!("id-{}", i), a: 1.2, b: 2.3, c: 3.4, d: 4.5 };

        for queue_url in queue_urls {
            let resp = client.send_message()
                .queue_url(queue_url)
                .message_body(serde_json::to_string(&data).unwrap())
                .send().await;

            match resp {
                Err(e) => println!("ERROR: {}", e.to_string()),
                Ok(v) => println!("RESULT: {}", v.message_id.unwrap())
            }
        }
    }
}

Running that, letting the queues drain, and running the following in Log Insights:

stats avg(@initDuration), avg(@duration), count(@initDuration), count(@duration)

Gives:

Versionavg(@initDuration)avg(@duration)count(@initDuration)count(@duration)
Typescript245.402545.1185121010
Rust34.824426.592891010

So it appears the Typescript init durations are still about 7X the Rust ones fairly consistently (although we were only able to force around 10 for each here), and the durations were about 70% longer.

Conclusion

As we can see, the init durations and overall durations for our Rust Lambda were much lower than our Typescript Lambda. On the other hand, the Typescript version built much faster and required significantly less code. If those results generalize to other workloads, I'd probably prefer Rust for spikey workloads where minimizing processing time is important, e.g. for spikey workloads that are expected to be near-real-time. In addition, I would choose Rust for high volume flows where 70% faster can make a huge difference over millions or billions of invocations, in terms of monetary cost and Lambda reserved concurrency contention. I would choose Typescript for low volume workloads where consistently low latencies are not important, such as a simple API Gateway endpoint that doesn't receive a lot of volume and where latency spikes (due to cold starts) are acceptable.

Next time we'll convert the SQS messages to contain a reference to a large S3 Object containing JSON and convert the Lambdas to read the data line-by-line and shuttle it into DynamoDB.