Reducing Clojure Lambda Cold Starts Part 8 - Investigate Bottlenecks with Embedded Metrics

Reducing Clojure Lambda Cold Starts Part 8 - Investigate Bottlenecks with Embedded Metrics

My last post has me wondering why there is so much difference between the warmed durations of ClojureScript and Clojure or JavaScript. I would have expected my implementation to be close to one or the other. I'll gather some metrics to see what is going on.

Embedded Metrics

AWS Embedded Metrics Format is great. You just log your metrics in a specific format and a CloudWatch daemon automatically gathers the metrics. No expensive individual API calls, no overrunning API call limits, plus you can view your metrics in your logs as well as in CloudWatch metrics, pretty cool.

CLJC

Add a json namespace src/cljc/tax/json.cljc:

(ns tax.json
  #?(:clj (:require [jsonista.core :as json])))

#?(:clj (def mapper
          (json/object-mapper
           {:encode-key-fn name
            :decode-key-fn keyword})))

(defn ->json [v]
  #?(:clj (json/write-value-as-string v mapper)
     :cljs (js/JSON.stringify (clj->js v))))

Add a metrics namespace src/cljc/tax/metrics.cljc:

(ns tax.metrics
  (:require [tax.json :as json]))

(defn now []
  #?(:clj (System/currentTimeMillis)
     :cljs (js/Date.now)))

(defn log [v]
  #?(:clj (println v)
     :cljs (js/console.log v)))

(defn emit-metric-aux [namespace dimensions {:strs [name unit value]}]
  (let [metric (merge
                {"_aws"
                 {"Timestamp" (now)
                  "CloudWatchMetrics" [{"Namespace" namespace
                                        "Dimensions" [(keys dimensions)]
                                        "Metrics" [{"Name" name
                                                    "Unit" unit}]}]}
                 name value}
                dimensions)]
    (log (json/->json metric))))

(def lang #?(:clj "clj" :cljs "cljs"))

(defn emit-metric [metric-name run-time]
  (emit-metric-aux "tax-engine-experiments"
                   {"lang" lang}
                   {"name" metric-name
                    "unit" "Milliseconds"
                    "value" run-time}))

Update src/cljs/tax/core.cljs:

(ns tax.core
  (:require [cljs.core.async :as async :refer [<!]]
            [cljs.core.async.interop :refer-macros [<p!]]
            [clojure.string :as s]
            ["aws-sdk" :as aws]

            [tax.calcs :refer [calculate]]
            [tax.metrics :as metrics :refer [emit-metric]])
  (:require-macros [cljs.core.async.macros :refer [go]]))

(def client (aws/S3.))

(def output-bucket js/process.env.CALCULATIONS_BUCKET)

(defn put-object [bucket-name object-key body]
  (.promise (.putObject client #js{"Bucket" bucket-name
                                   "Key" object-key
                                   "Body" body})))

(defn get-object [bucket-name object-key]
  (.promise (.getObject client #js{"Bucket" bucket-name,
                                   "Key" object-key})))

(defn get-object-as-string [bucket-name object-key]
  (go (let [resp (<p! (get-object bucket-name object-key))
            body (.-Body resp)]
        (.toString body "utf-8"))))

(defn ->items [input]
  ;; realizing the items with mapv to print parse time
  (mapv
   (fn [line]
     (js->clj (js/JSON.parse line) :keywordize-keys true))
   (s/split input #"\n")))

(defn ->json-output [items]
  (s/join "\n" (map (comp js/JSON.stringify clj->js) items)))

(defn handler [event context callback]
  ;; only grabbing a single message at a time, so we can just get the first.
  (go (let [message-body (get-in (js->clj event) ["Records" 0 "body"])
            props (js/JSON.parse message-body)
            bucket (.-bucket props)
            key (.-key props)

            start (metrics/now)
            input (<! (get-object-as-string bucket key))
            _ (emit-metric "get-object" (- (metrics/now) start))

            start (metrics/now)
            input-lines (->items input)
            _ (emit-metric "parse-input" (- (metrics/now) start))

            calculated-items (calculate input-lines)

            start (metrics/now)
            output-string (->json-output calculated-items)
            _ (emit-metric "convert-to-output" (- (metrics/now) start))

            start (metrics/now)
            put-result (<p! (put-object output-bucket key output-string))
            _ (emit-metric "put-to-output" (- (metrics/now) start))]
        (callback nil put-result))))

And src/clj/tax/core.clj:

(ns tax.core
  (:require [jsonista.core :as json]
            [clojure.string :as s]
            [clojure.java.io :as io]
            [tax.calcs :refer [calculate]]
            [tax.metrics :as metrics :refer [emit-metric]])
  (:import (software.amazon.awssdk.services.s3 S3Client)
           (software.amazon.awssdk.services.s3.model GetObjectRequest PutObjectRequest)
           (software.amazon.awssdk.core.sync RequestBody))
  (:gen-class
   :methods [^:static [calculationsHandler [Object] Object]]))

(def client (-> (S3Client/builder) (.build)))

(def output-bucket (System/getenv "CALCULATIONS_BUCKET"))

(defn put-object [bucket-name object-key body]
  (.putObject client
              (-> (PutObjectRequest/builder)
                  (.bucket bucket-name)
                  (.key object-key)
                  (.build))
              (RequestBody/fromString body)))

(defn get-object-as-string [bucket-name object-key]
  (-> (.getObjectAsBytes client (-> (GetObjectRequest/builder)
                                    (.bucket bucket-name)
                                    (.key object-key)
                                    (.build)))
      (.asInputStream)
      io/reader
      slurp))

(def mapper
  (json/object-mapper
   {:encode-key-fn name
    :decode-key-fn keyword}))

(defn ->items [input]
  ;; realizing the items with mapv to print parse time
  (mapv
   (fn [line]
     (json/read-value line mapper))
   (s/split input #"\n")))

(defn ->json-output [items]
  (s/join "\n" (map #(json/write-value-as-string % mapper) items)))

(defn -calculationsHandler [event]
  (let [[{message-body "body"}] (get event "Records")
        props (json/read-value message-body)
        bucket (get props "bucket")
        key (get props "key")

        start (metrics/now)
        input (get-object-as-string bucket key)
        _ (emit-metric "get-object" (- (metrics/now) start))

        start (metrics/now)
        input-lines (->items input)
        _ (emit-metric "parse-input" (- (metrics/now) start))

        calculated-items (calculate input-lines)

        start (metrics/now)
        output-string (->json-output calculated-items)
        _ (emit-metric "convert-to-output" (- (metrics/now) start))

        start (metrics/now)
        put-result (time (put-object output-bucket key output-string))
        _ (emit-metric "put-to-output" (- (metrics/now) start))]
    put-result))

Running the SQS blaster to invoke the CLJ and CLJS versions 1000 times and looking in CloudWatch metrics, I get this:

Screen Shot 2021-12-27 at 12.56.18 PM.png

So it seems that the biggest difference, by far, is in parsing the input. I wonder if the small memory/processor size is affecting the ClojureScript, lets add that as a dimension. Updating src/cljc/tax/metrics.cljc:

(ns tax.metrics
  (:require [tax.json :as json]))

(defn now []
  #?(:clj (System/currentTimeMillis)
     :cljs (js/Date.now)))

#?(:cljs (def env-vars (into {} (js->clj (js/Object.entries js/process.env)))))

(defn env [var-name]
  #?(:clj (System/getenv var-name)
     :cljs (get env-vars var-name)))

(defn log [v]
  #?(:clj (println v)
     :cljs (js/console.log v)))

(defn emit-metric-aux [namespace dimensions {:strs [name unit value]}]
  (let [metric (merge
                {"_aws"
                 {"Timestamp" (now)
                  "CloudWatchMetrics" [{"Namespace" namespace
                                        "Dimensions" [(keys dimensions)]
                                        "Metrics" [{"Name" name
                                                    "Unit" unit}]}]}
                 name value}
                dimensions)]
    (log (json/->json metric))))

(def lang #?(:clj "clj" :cljs "cljs"))

(defn emit-metric [metric-name run-time]
  (emit-metric-aux "tax-engine-experiments"
                   {"lang" lang
                    "memory" (env "AWS_LAMBDA_FUNCTION_MEMORY_SIZE")}
                   {"name" metric-name
                    "unit" "Milliseconds"
                    "value" run-time}))

That does, actually, make a significant difference:

Screen Shot 2021-12-27 at 1.43.46 PM.png

I wonder how the durations compare. I'll query Log Insights again. It looks like it does reduce the average warmed duration from 7665.5545 to 1690.0404. This is much closer to the 1198.9986 for the same sized Clojure Lambda. I had hoped to be able to get away with a much smaller memory size, but I guess I should have been comparing similar memory sizes for the two languages.

Comparing all the metrics for the 512 CLJ and CLJS Lambdas:

Screen Shot 2021-12-27 at 1.53.39 PM.png

A few of the numbers are significantly better with CLJS, most are about the same, but a couple are significantly worse, particularly the JSON unmarshalling and marshaling. I'm wondering if the JS lambda will benefit as significantly as the CLJS and if the CLJS is still bad in relation. I'll add metrics logging to the JavaScript version and bump to 512 memory:

  RunCalculationsJS:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${AWS::StackName}-run-calcs-js"
      Handler: index.handler
      Runtime: nodejs14.x
      Timeout: 900
      MemorySize: 512
      Policies:
        - AWSLambdaBasicExecutionRole
        - S3ReadPolicy:
            BucketName: !Ref TransactionsBucket
        - S3WritePolicy:
            BucketName: !Ref CalculationsBucket
        - Version: '2012-10-17' 
          Statement:
            - Effect: Allow
              Action:
                - s3:ListAllMyBuckets
              Resource: 'arn:aws:s3:::*'
      Environment:
        Variables:
          TRANSACTIONS_BUCKET: !Ref TransactionsBucket
          CALCULATIONS_BUCKET: !Ref CalculationsBucket
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt RunJavaScriptCalculationsQueue.Arn
            BatchSize: 1
      InlineCode: |
        const AWS = require('aws-sdk');
        const client = new AWS.S3();

        const outputBucket = process.env.CALCULATIONS_BUCKET;
        const memorySize = process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE;

        const getObjectAsString = async (Bucket, Key) => {
          const {Body} = await client.getObject({Bucket, Key}).promise();
          return Body.toString("utf-8");
        };

        const putObject = async (Bucket, Key, Body) => {
          return await client.putObject({Bucket, Key, Body}).promise();
        };

        const toItems = (input) => {
          return input.split("\n").map(JSON.parse);
        };

        const compareItems = (a, b) => a.a - b.a;

        const calculateItem = ({a, b, c, d}) => {
          const x = a + b + c + d;
          const y = x / c;
          const z = y * a * b * c * d;
          return {x, y, z};
        };

        const emitMetricAux = (namespace, dimensions, {name, unit, value}) => {
          const baseMetric = {_aws: {Timestamp: Date.now(),
                                     CloudWatchMetrics: [{Namespace: namespace,
                                                          Dimensions: [Object.keys(dimensions)],
                                                          Metrics: [{Name: name, Unit: unit}]}]}};
          const metric = {...baseMetric, ...dimensions};
          metric[name] = value;

          console.log(JSON.stringify(metric));
        };

        const emitMetric = (metricName, runTime) => {
          emitMetricAux("tax-engine-experiments",
                        {lang: "js", memory: memorySize},
                        {name: metricName, unit: "Milliseconds", value: runTime});
        };

        const calculate = (inputLines) => {
          let start = Date.now();
          const sortedLines = inputLines.sort(compareItems);
          emitMetric("sorting", Date.now() - start);

          start = Date.now();
          const results = sortedLines.map(calculateItem);
          emitMetric("calculate", Date.now() - start);

          return results;
        };

        const toJsonOutput = (items) => {
          return items.map(JSON.stringify).join("\n");
        };

        exports.handler = async function(event) {
          const {body} = event.Records[0];
          const {bucket, key} = JSON.parse(body);

          let start = Date.now();
          const input = await getObjectAsString(bucket, key);
          emitMetric("get-object", Date.now() - start);

          start = Date.now();
          const inputLines = toItems(input);
          emitMetric("parse-input", Date.now() - start);

          const calculatedItems =  calculate(inputLines);

          start = Date.now();
          const outputString = toJsonOutput(calculatedItems);
          emitMetric("convert-to-output", Date.now() - start);

          start = Date.now();
          const result =  await putObject(outputBucket, key, outputString);
          emitMetric("put-to-output", Date.now() - start);

          return result;
        }

Running the SQS blaster against it I get these comparisons for all the major operations:

Screen Shot 2021-12-27 at 2.46.06 PM.png

The JS version crushed the other two with 512 MB. Checking the average warm duration in Log Insights I get:

JS: 392.3617, CLJS: 1655.6854, CLJ: 933.6807, JAVA: 753.2914

Umm, wow, I had heard the performance of JavaScript on Node.js had gotten faster, but I didn't realize how much faster! Nearly 2X as fast as Java, more than 4X as fast as CLJS, and more than 2X as fast as CLJ with the implementations here. I wonder if I can bring them closer to parity with JS.

Optimizing With Deftype

It looks like calculate is a bottleneck, I suspect it's due to all the Clojure maps it's operating on. I'll try deftype as just creates native objects that should be fast to access properties from in either runtime.

I'll add a src/cljc/tax/records.cljc namespace:

(ns tax.records)

(deftype Data [a b c d])

(deftype Calc [x y z])

And update src/clj/tax/core.clj:

(ns tax.core
  (:require [jsonista.core :as json]
            [clojure.string :as s]
            [clojure.java.io :as io]
            [tax.calcs :refer [calculate]]
            [tax.metrics :as metrics :refer [emit-metric]]
            [tax.records :refer [->Data]])
  (:import (tax.records Calc)
           (software.amazon.awssdk.services.s3 S3Client)
           (software.amazon.awssdk.services.s3.model GetObjectRequest PutObjectRequest)
           (software.amazon.awssdk.core.sync RequestBody)
           (com.fasterxml.jackson.databind ObjectMapper)
           (com.fasterxml.jackson.core JsonProcessingException))
  (:gen-class
   :methods [^:static [calculationsHandler [Object] Object]]))

(def client (-> (S3Client/builder) (.build)))

(def output-bucket (System/getenv "CALCULATIONS_BUCKET"))

(defn put-object [bucket-name object-key body]
  (.putObject client
              (-> (PutObjectRequest/builder)
                  (.bucket bucket-name)
                  (.key object-key)
                  (.build))
              (RequestBody/fromString body)))

(defn get-object-as-string [bucket-name object-key]
  (-> (.getObjectAsBytes client (-> (GetObjectRequest/builder)
                                    (.bucket bucket-name)
                                    (.key object-key)
                                    (.build)))
      (.asInputStream)
      io/reader
      slurp))

(def mapper
  (json/object-mapper
   {:encode-key-fn name
    :decode-key-fn keyword}))

(defn ->items [input]
  ;; realizing the items with mapv to print parse time
  (mapv
   (fn [line]
     (let [{:keys [a b c d]} (json/read-value line mapper)]
       (->Data a b c d)))
   (s/split input #"\n")))

(defn ->json-output [items]
  (s/join "\n" (map (fn [^Calc calc] (.writeValueAsString mapper calc)) items)))

(defn -calculationsHandler [event]
  (let [[{message-body "body"}] (get event "Records")
        props (json/read-value message-body)
        bucket (get props "bucket")
        key (get props "key")

        start (metrics/now)
        input (get-object-as-string bucket key)
        _ (emit-metric "get-object" (- (metrics/now) start))

        start (metrics/now)
        input-lines (->items input)
        _ (emit-metric "parse-input" (- (metrics/now) start))

        calculated-items (calculate input-lines)

        start (metrics/now)
        output-string (->json-output calculated-items)
        _ (emit-metric "convert-to-output" (- (metrics/now) start))

        start (metrics/now)
        put-result (time (put-object output-bucket key output-string))
        _ (emit-metric "put-to-output" (- (metrics/now) start))]
    put-result))

And src/cljs/tax/core.cljs:

(ns tax.core
  (:require [cljs.core.async :as async :refer [<!]]
            [cljs.core.async.interop :refer-macros [<p!]]
            [clojure.string :as s]
            ["aws-sdk" :as aws]

            [tax.calcs :refer [calculate]]
            [tax.metrics :as metrics :refer [emit-metric]])
  (:require-macros [cljs.core.async.macros :refer [go]]))

(def client (aws/S3.))

(def output-bucket js/process.env.CALCULATIONS_BUCKET)

(defn put-object [bucket-name object-key body]
  (.promise (.putObject client #js{"Bucket" bucket-name
                                   "Key" object-key
                                   "Body" body})))

(defn get-object [bucket-name object-key]
  (.promise (.getObject client #js{"Bucket" bucket-name,
                                   "Key" object-key})))

(defn get-object-as-string [bucket-name object-key]
  (go (let [resp (<p! (get-object bucket-name object-key))
            body (.-Body resp)]
        (.toString body "utf-8"))))

(defn ->items [input]
  ;; realizing the items with mapv to print parse time
  (mapv
   (fn [line]
     (js/JSON.parse line))
   (s/split input #"\n")))

(defn ->json-output [items]
  (s/join "\n" (map js/JSON.stringify items)))

(defn handler [event context callback]
  ;; only grabbing a single message at a time, so we can just get the first.
  (go (let [message-body (get-in (js->clj event) ["Records" 0 "body"])
            props (js/JSON.parse message-body)
            bucket (.-bucket props)
            key (.-key props)

            start (metrics/now)
            input (<! (get-object-as-string bucket key))
            _ (emit-metric "get-object" (- (metrics/now) start))

            start (metrics/now)
            input-lines (->items input)
            _ (emit-metric "parse-input" (- (metrics/now) start))

            calculated-items (calculate input-lines)

            start (metrics/now)
            output-string (->json-output calculated-items)
            _ (emit-metric "convert-to-output" (- (metrics/now) start))

            start (metrics/now)
            put-result (<p! (put-object output-bucket key output-string))
            _ (emit-metric "put-to-output" (- (metrics/now) start))]
        (callback nil put-result))))

And src/cljc/tax/calcs.cljc:

(ns tax.calcs
  (:require [tax.metrics :as metrics :refer [emit-metric]]
            [tax.records :refer [->Calc]])
  #?(:clj (:import (tax.records Data))))

(defn get-a [item]
  #?(:clj (.a item) :cljs (.-a item)))

(defn calculate-aux [items]
  ;; realizing the items with mapv to print calc time
  (mapv
   (fn [^Data item]
     (let [a (get-a item)
           b #?(:clj (.b item) :cljs (.-b item))
           c #?(:clj (.c item) :cljs (.-c item))
           d #?(:clj (.d item) :cljs (.-d item))
           x (+ a b c d)
           y (/ x c)
           z (* y a b c d)]
       (->Calc x y z)))
   items))

(defn calculate [items]
  (let [start (metrics/now)
        sorted (sort-by get-a items)
        _ (emit-metric "sorting" (- (metrics/now) start))

        start (metrics/now)
        calculations (calculate-aux sorted)
        _ (emit-metric "calculate" (- (metrics/now) start))]
    calculations))

SQS blasting again gives the average warm durations:

442.651 CLJS, 2259.1084 CLJ

Encouraging results for ClojureScript, nearly as fast as the JS version! Something seems wrong with the Clojure version, though, it shouldn't have gotten so much slower. Let's look at the detailed metrics:

Screen Shot 2021-12-27 at 5.26.44 PM.png

Let's try adding type hints to our Clojure code:

src/cljc/tax/calcs.cljc:

(ns tax.calcs
  (:require [tax.metrics :as metrics :refer [emit-metric]]
            [tax.records :refer [->Calc]])
  #?(:clj (:import (tax.records Data Calc))))

(defn get-a [^Data item]
  #?(:clj (.a item) :cljs (.-a item)))

(defn calculate-aux [items]
  ;; realizing the items with mapv to print calc time
  (mapv
   (fn [^Data item]
     (let [a (get-a item)
            b #?(:clj (.b item) :cljs (.-b item))
           c #?(:clj (.c item) :cljs (.-c item))
           d #?(:clj (.d item) :cljs (.-d item))
           x (+ a b c d)
           y (/ x c)
           z (* y a b c d)]
       #?(:clj (Calc. x y z) :cljs (->Calc x y z))))
   items))

(defn calculate [items]
  (let [start (metrics/now)
        sorted (sort-by get-a items)
        _ (emit-metric "sorting" (- (metrics/now) start))

        start (metrics/now)
        calculations (calculate-aux sorted)
        _ (emit-metric "calculate" (- (metrics/now) start))]
    calculations))

src/cljc/tax/records.cljc:

(ns tax.records)

(deftype Data [^double a ^double b ^double c ^double d])

(deftype Calc [^double x ^double y ^double z])

src/clj/tax/core.clj:

(ns tax.core
  (:require [jsonista.core :as json]
            [clojure.string :as s]
            [clojure.java.io :as io]
            [tax.calcs :refer [calculate]]
            [tax.metrics :as metrics :refer [emit-metric]])
  (:import (tax.records Calc Data)
           (software.amazon.awssdk.services.s3 S3Client)
           (software.amazon.awssdk.services.s3.model GetObjectRequest PutObjectRequest)
           (software.amazon.awssdk.core.sync RequestBody)
           (com.fasterxml.jackson.databind ObjectMapper)
           (com.fasterxml.jackson.core JsonProcessingException))
  (:gen-class
   :methods [^:static [calculationsHandler [Object] Object]]))

(set! *warn-on-reflection* true)

(defn ^S3Client client-aux [] (-> (S3Client/builder) (.build)))

(def ^S3Client client (memoize client-aux))

(def output-bucket (System/getenv "CALCULATIONS_BUCKET"))

(defn put-object [bucket-name object-key body]
  (let [^PutObjectRequest req (-> (PutObjectRequest/builder)
                                 (.bucket bucket-name)
                                 (.key object-key)
                                 (.build))
        ^RequestBody body (RequestBody/fromString body)]
    (.putObject (client) req body)))

(defn get-object-as-string [bucket-name object-key]
  (let [^GetObjectRequest req (-> (GetObjectRequest/builder)
                                  (.bucket bucket-name)
                                  (.key object-key)
                                  (.build))]
    (-> (.getObjectAsBytes (client) req)
        (.asInputStream)
        io/reader
        slurp)))

(def mapper
  (json/object-mapper
   {:encode-key-fn name
    :decode-key-fn keyword}))

(defn ->items [input]
  ;; realizing the items with mapv to print parse time
  (mapv
   (fn [line]
     (let [{:keys [^int a ^int b ^int c ^int d]} (json/read-value line mapper)]
       (Data. (double a) (double b) (double c) (double d))))
   (s/split input #"\n")))

(defn ->json-output [items]
  (s/join "\n" (map (fn [calc] (json/write-value-as-string calc mapper)) items)))

(defn -calculationsHandler [event]
  (let [[{message-body "body"}] (get event "Records")
        props (json/read-value message-body)
        bucket (get props "bucket")
        key (get props "key")

        start (metrics/now)
        input (get-object-as-string bucket key)
        _ (emit-metric "get-object" (- (metrics/now) start))

        start (metrics/now)
        input-lines (->items input)
        _ (emit-metric "parse-input" (- (metrics/now) start))

        calculated-items (calculate input-lines)

        start (metrics/now)
        output-string (->json-output calculated-items)
        _ (emit-metric "convert-to-output" (- (metrics/now) start))

        start (metrics/now)
        put-result (time (put-object output-bucket key output-string))
        _ (emit-metric "put-to-output" (- (metrics/now) start))]
    put-result))

Now the average warm duration of the CLJ version is 637.0643, much better! It's really striking how much of a difference type hints make. Let's look at our detailed metrics again:

Screen Shot 2021-12-27 at 7.20.02 PM.png

Type hinting seems to have dramatically speeded up all the operations but especially 'calculate'. 'get-object' an 'put-to-output' still seem extremely slow, however. I wouldn't imagine the new Java S3 client would be so much slower than the old JavaScript S3 client. Maybe I'm still missing some type hints?

Conclusion

CloudWatch Embedded Metrics helped to drill down on what was slowing down our Clojure/Script Lambda. Honestly, I was a little dismayed at the performance difference between my JS and CLJS versions, but it seems one of the biggest culprits was using maps for the data in 'calculate'. Converting to use native objects with 'deftype' made a dramatic difference. For ClojureScript, the changes were minimal and the performance increase was striking, from 4X slower to nearly the same as plain old JavaScript! The story with Clojure was not quite as exciting, I had to add a lot of type hinting and only gained about a 50% performance increase, still about 50% slower than the JavaScript version. I'll investigate further performance optimization in my next post.