Reducing Clojure Lambda Cold Starts Part 11 - Rust Part 2

Reducing Clojure Lambda Cold Starts Part 11 - Rust Part 2

Where Lambda cold starts often get worse in other runtimes is when you start adding dependencies, particularly an AWS SDK dependency. Let's see how Rust fares with an S3 client dependency. Updating Cargo.toml:

[package]
name = "tax_engine_experiments_rust"
version = "0.1.0"
edition = "2021"
autobins = false

[dependencies]
lambda_runtime = "0.4.1"
tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] }
serde = "^1"
serde_json = "^1"
serde_derive = "^1"
bytes = "1"

aws-smithy-http = "0.33.1"
aws-config = "0.3.0"
aws-sdk-s3 = "0.3.0"

[[bin]]
name = "bootstrap"
path = "src/main.rs"

And updating src/main.rs with the same workload as our other Lambdas:

use std::env;
use bytes::Buf;
use std::str;
use lambda_runtime::{handler_fn, Context, Error as LambdaError};
use serde_json::{json, Value};
use serde::{Deserialize, Serialize};
use bytes::Bytes;
use aws_smithy_http::byte_stream::ByteStream;

#[derive(Deserialize)]
struct S3Args {
    bucket: String,
    key: String
}
#[derive(Deserialize)]
struct Data {
    a: f64,
    b: f64,
    c: f64,
    d: f64
}

#[derive(Serialize)]
struct Calc {
    x: f64,
    y: f64,
    z: f64
}

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    let func = handler_fn(func);
    lambda_runtime::run(func).await?;
    Ok(())
}

fn to_item(line: &str) -> Data {
    serde_json::from_str(&line).unwrap()
}

fn calculate_item(item: &Data) -> Calc {
    let x = item.a + item.b + item.c + item.d;
    let y = x / item.c;
    let z = y * item.a * item.b * item.c * item.d;
    Calc { x, y, z }
}

fn calc_to_json(item: &Calc) -> String {
    json!(item).to_string()
}

async fn func(event: Value, _: Context) -> Result<Value, LambdaError> {
    println!("EVENT: {}", event);
    let record = &event["Records"][0];
    let body = record["body"].as_str().unwrap_or("");
    let s3_args: S3Args = serde_json::from_str(body).unwrap();

    let shared_config = aws_config::load_from_env().await;
    let client = aws_sdk_s3::Client::new(&shared_config);
    let key = s3_args.key;
    let response = client.get_object().bucket(s3_args.bucket).key(key.as_str()).send().await?;
    let data = response.body.collect().await.map(|data| data.into_bytes()).expect("failed getting data");
    let data_as_str = str::from_utf8(data.chunk()).expect("failed converting to string");
    let lines = data_as_str.split("\n");
    let items = lines.map(|line| to_item(line));
    let calcs = items.map(|item| calculate_item(&item));
    let result_lines = calcs.map(|calc| calc_to_json(&calc));
    let result_lines_vec: Vec<String> = result_lines.collect();
    let result = result_lines_vec.join("\n");
    let result_bytes = ByteStream::from(Bytes::copy_from_slice(result.as_bytes()));
    let output_bucket = env::var("CALCULATIONS_BUCKET").expect("Failed getting output bucket name");

    client.put_object().bucket(output_bucket).key(key).body(result_bytes).send().await?;

    Ok(())
}

And running the SQS blaster we get:

avg(@duration)avg(@initDuration)count(@duration)count(@initDuration)
262.297733.3736100225

Quite impressive, especially the init durations! The added dependencies added a bunch of transient dependencies, so I was a little worried that the init durations would become just as bad as other runtimes after adding an AWS SDK dependency. That didn't seem to be the case, 33 ms is great! The durations are significantly better than the other with Lambdas we've tested, but the difference is not as dramatic as with the init durations. I suspect the S3 get and put are still the bottlenecks here. Let's add some embedded metrics:

Add src/metrics.rs:

use std::time::{SystemTime, UNIX_EPOCH};
use std::env;

pub fn now() -> u128 {
    let start = SystemTime::now();
    return start.duration_since(UNIX_EPOCH).expect("failed getting duration").as_millis();
}

fn as_json_str(s: &str) -> String {
    format!("\"{}\"", s)
}

pub fn emit_metric(metric_name: &str, start: u128) -> () {
    let metric = json_fn!(|name, value, memory, timestamp| {
        "_aws": {
            Timestamp: $timestamp,
            CloudWatchMetrics: [{
                Namespace: "tax-engine-experiments",
                Dimensions: [["lang", "memory", "version"]],
                Metrics: [{Name: $name, Unit: "Milliseconds"}]
            }]
        },
        $name: $value,
        lang: "rust",
        memory: $memory,
        version: "initial"
    });
    let end = now();
    let elapsed = end - start;
    let memory = env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE").expect("Failed getting memory size");
    let json = metric(&as_json_str(&metric_name), &elapsed.to_string(), &as_json_str(&memory), &end.to_string());
    println!("{}", json);
}

Update src/main.rs:

use std::env;
use bytes::Buf;
use std::str;
use lambda_runtime::{handler_fn, Context, Error as LambdaError};
use serde_json::{json, Value};
use serde::{Deserialize, Serialize};
use bytes::Bytes;
use aws_smithy_http::byte_stream::ByteStream;

#[macro_use]
extern crate json_str;

mod metrics;

#[derive(Deserialize)]
struct S3Args {
    bucket: String,
    key: String
}
#[derive(Deserialize)]
struct Data {
    a: f64,
    b: f64,
    c: f64,
    d: f64
}

#[derive(Serialize)]
struct Calc {
    x: f64,
    y: f64,
    z: f64
}

#[tokio::main]
async fn main() -> Result<(), LambdaError> {
    let func = handler_fn(func);
    lambda_runtime::run(func).await?;
    Ok(())
}

fn to_item(line: &str) -> Data {
    serde_json::from_str(&line).unwrap()
}

fn calculate_item(item: &Data) -> Calc {
    let x = item.a + item.b + item.c + item.d;
    let y = x / item.c;
    let z = y * item.a * item.b * item.c * item.d;
    Calc { x, y, z }
}

fn calc_to_json(item: &Calc) -> String {
    json!(item).to_string()
}

async fn func(event: Value, _: Context) -> Result<(), LambdaError> {
    println!("EVENT: {}", event);
    let record = &event["Records"][0];
    let body = record["body"].as_str().unwrap_or("");
    let s3_args: S3Args = serde_json::from_str(body).unwrap();

    let shared_config = aws_config::load_from_env().await;
    let client = aws_sdk_s3::Client::new(&shared_config);
    let key = s3_args.key;

    let mut start = metrics::now();
    let response = client.get_object().bucket(s3_args.bucket).key(key.as_str()).send().await?;
    let data = response.body.collect().await.map(|data| data.into_bytes()).expect("failed getting data");
    let data_as_str = str::from_utf8(data.chunk()).expect("failed converting to string");
    metrics::emit_metric("get-object", start);

    start = metrics::now();
    let lines = data_as_str.split("\n");
    let mut items: Vec<Data> = lines.map(|line| to_item(line)).collect();
    metrics::emit_metric("parse-input", start);

    start = metrics::now();
    items.sort_by(|a, b| a.a.partial_cmp(&b.a).unwrap());
    metrics::emit_metric("sorting", start);

    start = metrics::now();
    let calcs: Vec<Calc> = items.iter().map(|item| calculate_item(&item)).collect();
    metrics::emit_metric("calculate", start);

    start = metrics::now();
    let result_lines: Vec<String> = calcs.iter().map(|calc| calc_to_json(&calc)).collect();
    let result = result_lines.join("\n");
    let result_bytes = ByteStream::from(Bytes::copy_from_slice(result.as_bytes()));
    metrics::emit_metric("convert-to-output", start);

    let output_bucket = env::var("CALCULATIONS_BUCKET").expect("Failed getting output bucket name");

    start = metrics::now();
    client.put_object().bucket(output_bucket).key(key).body(result_bytes).send().await?;
    metrics::emit_metric("put-to-output", start);

    Ok(())
}

Well it seems my "heavy computation" presented absolutely no challenge to Rust, 0 ms to do the calculations vs. like 6 ms for the CLJS version. Let's try with 1 million lines:

Screen Shot 2021-12-29 at 11.44.02 AM.png

Geez, still stellar performance across the board. Running the CLJS version and comparing:

Screen Shot 2021-12-29 at 11.57.14 AM.png

The CLJS version ended up running out of memory so I didn't gather metrics for the put, but I was able to for the others. The Rust version breaks the getting and parsing into two steps, which combined are a little more than a second. The equivalent in the CLJS code is 379 seconds. The sorting and calculating steps are also orders of magnitude larger for the CLJS code. Granted, the getting and parsing are very naively implemented in the CLJS code, but the Rust version is doing essentially the same things.

Conclusion

This is my first real attempt at writing Rust code, so I'd imagine I'm doing a lot of additional naive, unidiomatic things here, but I was really surprised how well my naive code performed. Another thing I found surprising is that there wasn't really that much more ceremony in writing the code than Clojure, it is surprisingly expressive for a statically-typed, extremely performant language. I'm definitely sold as far as Lambda goes as the raw speed dramatically improves responsiveness and reduces costs and reserved concurrency contention, plus the negligible cold start durations make provisioned concurrency unnecessary. Win, win, win, win, win.