Wrzasq.pl

Provisioning AWS Athena queries with Lambda and StepFunctions

Saturday, 30 May 2020, 23:20

Amazon Athena is a brilliant tool for data processing and analytics in AWS cloud. Under the hood it utilizes Presto engine to query and process data in your S3 storage using standard SQL notation. The concept behind it is truely simple - run SQL queries against your data in S3 and pay only for the resurces consumed by the query. No cluster to manage - everything fully serverless and managed by Amazon; no need to learn new technology - you query data using SQL that most likely is known among your team; no need for additional storage or fees - you store data directly in S3. Did I mention a keyword serverless? Yes, it runs completely via API SDK, no need to manage any resource on your own (and cold startup in Athena is super low). It integrates tightly with Glue. When we say serverless in AWS we mainly think Lambda. And for sure, sooner or later you will want to integrate your query into some more complex workflow.

However Athena is not exactly a database, which you can just query - it is an analytics tool or sometimes ETL engine. It is designed to run on a large amount of data and not necessarily give you results instatly. In fact, what Athena does is that after running a query it starts processing your data and saves results into another location in S3. This means you need to wait until the query finishes and you can read the resutls. You can even use these results in multiple places when ready. Having these result in S3 (accessible by query execution ID - a unique identifier assigned for each query execution) means you don't need to read them in the same place where you initiated the query - you just start data processing query and then, once finished you use this identifier to read the results across different places in your workflow.

Somewhere in this process you need to wait for data processing to finish. Although you can of course do that in your Lambda function that runs the query, I think this is not a good idea:

  • I did not reach such long running times (yet?) but it is just possible that query execution time will exceed Lambda timeout,
  • you will pay for the Lambda execution time when it waits for query to finish,
  • in my opinion it is just bad design to use time-based wait conditions drectly in the code - this should be the role of orchestrator.

And when you need to orchestrate multiple steps of (serverless, but not only) process in AWS, your first bet will most likely be Step Functions. Let's build a state machine that will execute a query and check it's status to determinate following actions. This state machine we have two functions - first that executes the query and second that (periodically) checks the state of the execution. Our state will consist of two properties:

QueryId
execution identified - populated by first function;
State
last execution state - populated and updated by second function.

We need to build these two (simple) Lambda functions, define a state machine and create IAM roles for all three resources. Let's start with the first function - the one that will start the query (today I will code in C# - since April 2020 AWS Lambda supports .NET Core 3.1; however I don't have much experience with it and also the real code has been here super-simplified to include just the required logic, without too much abstraction - so don't focus on language aspects, but just AWS SDK):

using System;
using System.Threading.Tasks;
using Amazon.Athena;
using Amazon.Athena.Model;
using Amazon.Lambda.Core;
using Amazon.Lambda.Serialization.Json;
using Newtonsoft.Json.Linq;
using NLog;

[assembly: LambdaSerializer(typeof(JsonSerializer))]

namespace QueryExecutor {
    public class MachineState {
        public string QueryId { get; set; }
    }

    public class Handler {
        private static readonly Logger logger = LogManager.GetCurrentClassLogger();

        private readonly IAmazonAthena athena;

        public Handler() {
            athena = new AmazonAthenaClient();
        }

        public async Task<MachineState> handle(JToken input, ILambdaContext context) {
            try {
                return new MachineState {
                    QueryId = await runQuery()
                };
            }
            catch (Exception error) {
                logger.Error(error, "Critical error!");
                throw;
            }
        }

        private async Task<string> runQuery() {
            return (await athena.StartQueryExecutionAsync(new StartQueryExecutionRequest {
                QueryString = "SELECT …",
                ResultConfiguration = new ResultConfiguration {
                    OutputLocation = "s3://your-bucket/results/",
                },
                QueryExecutionContext = new QueryExecutionContext {
                    Database = "yourgluedbname"
                }
            })).QueryExecutionId;
        }
    }
}

So, we have the function code - now deploy it. While doing so, we need to create an IAM role for it. Here comes the tricky part - this function not only needs permissions to execute Athena query: it also needs permissions to access S3 (for reading existing data and writing results), and access to Glue metastore (if you run CREATE TABLE AS SELECT query, also to create tables):

    QueryExecutor:
        Type: "AWS::Serverless::Function"
        Properties:
            Runtime: "dotnetcore3.1"
            … # set your code location
            Policies:
                -
                    Version: "2012-10-17"
                    Statement:
                        -
                            Action:
                                - "athena:StartQueryExecution"
                                - "glue:CreateTable"
                                - "glue:DeleteTable"
                                - "glue:GetTable"
                            Effect: "Allow"
                            Resource:
                                - "*"
                -
                    Version: "2012-10-17"
                    Statement:
                        -
                            Action:
                                - "s3:DeleteObject"
                                - "s3:GetObject"
                                - "s3:ListBucket"
                                - "s3:PutObject"
                            Effect: "Allow"
                            Resource:
                                - "*" # put your S3 resource prefix here to restrict access

The second function will pick query identifier from state and check execution:

using System;
using System.Threading.Tasks;
using Amazon.Athena;
using Amazon.Athena.Model;
using Amazon.Lambda.Core;
using Amazon.Lambda.Serialization.Json;
using Newtonsoft.Json.Linq;
using NLog;

[assembly: LambdaSerializer(typeof(JsonSerializer))]

namespace QueryChecker {
    public class MachineState {
        public string QueryId { get; set; }
        public string State { get; set; }
    }

    public class Handler {
        private static readonly Logger logger = LogManager.GetCurrentClassLogger();

        private readonly IAmazonAthena athena;

        public Handler() {
            athena = new AmazonAthenaClient();
        }

        public async Task<MachineState> handle(MachineState input, ILambdaContext context) {
            try {
                return new MachineState {
                    // pass QueryId to allow looping
                    QueryId = input.QueryId,
                    State = await checkQuery(input.QueryId)
                };
            }
            catch (Exception error) {
                logger.Error(error, "Critical error!");
                throw;
            }
        }

        private async Task<string> checkQuery(string queryId) {
            var state = (await athena.GetQueryExecutionAsync(new GetQueryExecutionRequest {
                QueryExecutionId = queryId
            })).QueryExecution.Status;

            logger.Info("State info: {}", state.StateChangeReason);

            return state.State;
        }
    }
}

Now the IAM policy for the second function - this time it will be super simple, as this step doesn't involve any data handling, only state checking (no matter of query state this operation will not receive any results):

    QueryChecker:
        Type: "AWS::Serverless::Function"
        Properties:
            Runtime: "dotnetcore3.1"
            … # set your code location
            Policies:
                -
                    Version: "2012-10-17"
                    Statement:
                        -
                            Action:
                                - "athena:GetQueryExecution"
                            Effect: "Allow"
                            Resource:
                                - "*"

Possible exeuction states are:

SUCCEEDED
this is our desired end-state - query finished and results are available;
QUEUED
means the query execution has not yet started, awaiting computing resources to pick the query for processing - we need to wait;
RUNNING
query execution in progress - also need to wait;
FAILED
our query failed - state machine should either handle the error or fail the process;
CANCELLED
query execution was cancelled - whoever made it, for us it's a failure as we won't get the results.

Having both functions in place and knowing all possible query states, we can now assemble a state machine (it only needs to be able to invoke two created Lambda functions; for simplification I removed retries and error handling clauses):

    MachineRole:
        Type: "AWS::IAM::Role"
        Properties:
            AssumeRolePolicyDocument:
                Statement:
                    -
                        Action:
                            - "sts:AssumeRole"
                        Effect: "Allow"
                        Principal:
                            Service:
                                - "states.amazonaws.com"
            Policies:
                -
                    PolicyName: "AllowCallingLambda"
                    PolicyDocument:
                        Version: "2012-10-17"
                        Statement:
                            -
                                Action:
                                    - "lambda:InvokeFunction"
                                Effect: "Allow"
                                Resource:
                                    - !GetAtt "QueryExecutor.Arn"
                                    - !GetAtt "QueryChecker.Arn"

    QueryProcess:
        Type: "AWS::StepFunctions::StateMachine"
        Properties:
            RoleArn: !GetAtt "MachineRole.Arn"
            DefinitionString: !Sub |
                {
                    "StartAt": "Start query",
                    "States": {
                        "Start query": {
                            "Type": "Task",
                            "Resource": "${QueryExecutor.Arn}",
                            "Next": "Get query status"
                        },
                        "Get query status": {
                            "Type": "Task",
                            "Resource": "${QueryChecker.Arn}",
                            "Next": "Check query status"
                        },
                        "Check query status": {
                            "Type": "Choice",
                            "Choices": [
                                {
                                    "Variable": "$.State",
                                    "StringEquals": "SUCCEEDED",
                                    "Next": "Done"
                                },
                                {
                                    "Variable": "$.State",
                                    "StringEquals": "QUEUED",
                                    "Next": "Wait"
                                },
                                {
                                    "Variable": "$.State",
                                    "StringEquals": "RUNNING",
                                    "Next": "Wait"
                                }
                            ],
                            "Default": "Failed"
                        },
                        "Wait": {
                            "Type": "Wait",
                            "Seconds": 30,
                            "Next": "Get query status"
                        },
                        "Failed": {
                            "Type": "Fail"
                        },
                        "Done": {
                            "Type": "Succeed"
                        }
                    }
                }

Since few days AWS SAM also supports YAML-notation for state machines, but I haven't played with it yet, thus used a nested JSON notation.

Tags: , , , ,