Snowflake Lambda Data Loader – Example with AWS S3 Trigger
This is an example of how to make an AWS Lambda Snowflake database data loader. Snowflake database is a cloud platform suited to working with large amounts of data for data warehousing and analysis. AWS Lambda provides serverless compute – or really what is server on demand compute.
To issue commands to Snowflake, we must leverage the Snowflake driver. We’ll be using python for our lambda program. We need to import the driver, which means we’ll have to create a lambda deployment package of our own so we can import libraries not found in the standard environment provided by Amazon. The easiest way to do this is by setting up a virtual environment; on EC2 on Amazon Linux no less, as it replicates the deployment environment of a lambda.
Setting up IAM/security and general AWS things are outside the scope of this article. The first step is spinning up an EC2 environment; just a tiny t2 or t3.micro is fine. Note it would be best to make sure all services and environments are set up in the same region, ie. us-east-1.
# make directory mkdir snow_lambda; cd snow_lambda # make virtual environment virtualenv v-env; source v-env/bin/activate # explicitly install the amazon and snowflake packages and close the virtual environment cd v-env/lib64/python2.7/site-packages/ pip install snowflake-connector-python --target . pip install boto3 --target . chmod -R 755 .; deactivate # zip up the package, open up the lambda handler for editing zip -r9 ~/snow_lambda.zip . cd ~/snow_lambda vi lambda_function.py
The python Lambda to connect to Snowflake is pretty simple. This Lambda loads some data into a titanic survival table. The are some design choices I made here; for simplicity I’ve hardcoded in the Snowflake account data. Some form of encrypted Lambda variables would be the preferred way to actually store this data.
I fully parse the bucket object address and use part of the watched directory to craft the table I am writing to as well as the directory that is being read from. Thus this lambda code supports many tables. Could you use Snowpipe instead? Yes – although it simply loads data. That’s all I’m doing here too – but I could add a bunch of sql steps if I wanted to do additional ETL or ELT work in Snowflake with Lambda generated SQL. Note however a single lambda is limited to 15 minutes of run time. Take note that we also must specify the cache for the Snowflake driver as well; the only place we can write in Lambda is on /tmp, thus we point the cache there.
The last design choices I made were not to only load the file that triggered the Lambda, but simply load the entire directory. My speculation is this will make the load process a little quicker in the event many files are being written in at a time. I also set the purge option on so I would not have to manipulate the file any further. Error handling and ETL checkpointing are exercises left up to the developer.
import urllib import boto3 import os import snowflake.connector s3 = boto3.client('s3') SNOW_ACCOUNT = 'dp516776953.us-east-1' SNOW_USER = 'mikebot' SNOW_PASS = 'ap__^_^__taco' SNOW_FILE_FORMAT = 'DEFAULT_CSV' SNOW_DB = 'TITANIC' SNOW_SCHEMA = 'PUBLIC' def lambda_handler(event, context): # get the object that triggered lambda bucket = event['Records']['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records']['s3']['object']['key'].encode('utf8')) filenm = os.path.basename(key) fulldir = os.path.dirname(key) SNOW_TABLE = os.path.basename(fulldir) print ("bucket: " + bucket + "\n key: " + key + "\n filenm: " + filenm + "\n fulldir: " + fulldir + "\n SNOW_TABLE: " + SNOW_TABLE) # connect to snowflake - set ocsp response cache to /tmp, the only place we can write on lambda ctx = snowflake.connector.connect( user=SNOW_USER, password=SNOW_PASS, account=SNOW_ACCOUNT, database=SNOW_DB, schema=SNOW_SCHEMA, ocsp_response_cache_filename="/tmp/ocsp_response_cache" ) cs = ctx.cursor() # load the scored file # add your own error handling try: sql = "copy into "+ SNOW_TABLE + " from @S3_SUPPORT/titanic/fe_load/" + SNOW_TABLE + "/" + \ " FILE_FORMAT = '" + SNOW_FILE_FORMAT + "' ON_ERROR = 'ABORT_STATEMENT' PURGE = TRUE;" print(sql) cs.execute(sql) finally: cs.close() ctx.close()
Now to set up the Lambda in AWS. Create one in the same region as your Snowflake instance. I named mine fe_load and made it a python 2.7 lambda. The trigger I set up was to watch the directory /titanic/fe_load/. I set a 90 second time out, although I expect the lambdas to complete much more quickly than that.
I also set up the AWS CLI toolkit on my ec2 server and my config and credentials in ~/.aws – this makes it easy to send the deployment package in. Once the Lambda has been made, we can add our handler to the Lambda package and update it in AWS.
zip -g ../snow_lambda.zip lambda_function.py; $ ls -l ~/snow_lambda.zip -rw-rw-r-- 1 ec2-user ec2-user 25857013 Jun 10 23:52 /home/ec2-user/snow_lambda.zip
Note it’s rather large at 25 MB; once we hit 50+, the CLI will no longer allow direct upload, but will require the file to be hosted on S3 and then uploaded to Lambda. Note that the larger the install package, the greater amount of time it takes to actually deploy the code on each AWS Lambda execution instance.
# example for all that is required aws lambda update-function-code --function-name fe_load --region us-east-1 --zip-file fileb://~/snow_lambda.zip # approach needed if your lambda exceeds 50 MB #aws s3 cp ~/snow_lambda.zip s3://thisis-mybucket/snow_lambda.zip #aws lambda update-function-code --function-name fe_load --region us-east-1 --s3-bucket thisis-mybucket --s3-key snow_lambda.zip # stick in a sample file and make sure it loads into your table! # this should load the contents of my file test5_scored.csv into the table passengers_scored_api aws s3 cp ~/test5_scored.csv s3://thisis-mybucket/titanic/fe_load/passengers_scored_api/test5_scored.csv