Yet another key use case for Lambda has been its ability to transform data on the fly. Say you have a stream of data coming into an S3 bucket in the form of one or more CSV files, each CSV file being different in size. For your application to process this data, it first needs to be transformed into a JSON file and then uploaded to a DynamoDB table. So, how do you get this done effectively? One way is to leverage an EC2 instance that will periodically poll or pull the CSV files from S3, transform it, and upload it to DynamoDB. However, this can have potential issues, for example: what happens when there are no files coming into S3 for a long time? Do you keep the EC2 instance up and running all this time? Also, polling or pulling the files from S3 will require its own set of logic that you will have to maintain, which can be an additional burden. So, what's the easy way out? You guessed it: Lambda functions!
For this particular use case, we will be creating a simple S3 bucket that will host our CSV files. A corresponding Lambda function will be used to convert the CSVs into JSON files and push the transformed data to a DynamoDB table.
For this to work, we will additionally need to configure a trigger on our S3 bucket so that it triggers the Lambda function whenever a new CSV file is uploaded to it. This will be achieved by leveraging the S3 object created event as the trigger for our Lambda function, which we will explain in the upcoming steps.
Go ahead and create a new bucket in S3. For this use case, you can name the bucket anything you want; however, the CSV files are all going to be placed within a folder in the bucket with the name csv. With the bucket created, configure a simple DynamoDB table in the same region with the following structure:
ID |
Name |
Age |
1 |
John |
23 |
2 |
Sarah |
45 |
Next, create a Lambda function in the same region as the bucket. Copy and paste the following code in an index.js file:
'use strict'; console.log('Loading function'); const aws = require('aws-sdk'); const async = require('async'); const s3 = new aws.S3({ apiVersion: '2006-03-01' }); const csv = require("csvtojson"); const jsonfile = require('jsonfile'); const fs = require('fs'); const docClient = new aws.DynamoDB.DocumentClient(); exports.handler = (event, context, callback) => { async.auto({ download: function(callback) { console.log('Received event:', JSON.stringify(event, null, 2)); const bucket = event.Records[0].s3.bucket.name; let key =
decodeURIComponent(event.Records[0].s3.object.key
.replace(/\+/g, ' ')); const downloadParams = { Bucket: bucket, Key: key }; // removing the csv/ from the actual key-name key = key.replace('csv/', ''); // files can be downloaded in the /tmp directory in lambda let csvFile = "/tmp/"+key; let file = fs.createWriteStream(csvFile); s3.getObject(downloadParams).createReadStream().on('error',
function(err){ console.log("Error while downloading the file from S3: ",err); callback(err); }).pipe(file); file.on('finish', function() { file.close(); // close() is async, call cb after close completes. console.log("Download complete! "+csvFile); callback(null, {'csvFile':csvFile, 'bucketName':bucket,
'key':key}); }); file.on('error', function(err){ console.log("Error while downloading the Id3 file from S3:
",err); callback(err); }); }, csvtojson: ['download', function(results, callback){ console.log("Inside csvtojson function"); let csvFile = results.download.csvFile; csv() .fromFile(csvFile) .on("end_parsed",function(jsonArrayObj){
//when parse finished, result will be emitted here. console.log(jsonArrayObj); // Final file will have a .json extention let keyJson = results.download.key.replace(/.csv/i, ".json"); console.log("Final file: "+keyJson); // we are writing the final json file in the /tmp directory itself
in lambda let jsonFile = "/tmp/"+keyJson; jsonfile.writeFile(jsonFile, jsonArrayObj, function (err) { if(err){ console.error(err); callback(err); } }); callback(null, {'keyJson':keyJson, 'jsonFile':jsonFile}); }); }], sendToDynamo: ['download', 'csvtojson', function(results, callback) { console.log("Inside sendToDynamo function"); console.log("Importing data into DynamoDB. Please wait."); fs.readFile(results.csvtojson.jsonFile, function (err, data) { if (err){ console.log(err); return callback(err); } let obj = JSON.parse(data); async.forEachOf(obj, function (obj, key, cb) { let params = { TableName: process.env.TABLE_NAME, Item: { "ID": obj.ID, "Name": obj.Name, "Age": obj.Age } }; docClient.put(params, function(err, data) { if (err) { console.error("Unable to add ", data.Name,
". Error JSON:", JSON.stringify(err, null, 2)); cb(err); } else { console.log("PutItem succeeded"); cb(null, "PutItem succeeded"); } }); }, function (err) { if (err){ console.log(err); callback(err); } else{ callback(null, "Done!!"); } }); }); }] }, function(err, results) { if(err){ console.log("Finished with error!"); } else{ console.log(results); } }); };
The code is pretty self-explanatory. We first download the .csv file from the /csv folder present in your S3 bucket locally into Lambda. The file gets downloaded into the function's /tmp directory as this is the only available local filesystem you can write to in a function. Once the function downloads the file to /tmp, it is converted into a new JSON file. The JSON file is written into the /tmp directory as well. Now that the file is converted, the sendToDynamo function gets invoked, which reads the JSON file and writes its contents to our previously created DynamoDB table.
Make sure you have provided the function the necessary permissions before saving the function. In this case, the function will require permissions to retrieve the CSV file from the S3 bucket, write logs to CloudWatch, as well as write the data into your DynamoDB table. You can use the following snippet to configure your IAM role; just remember to substitute the <YOUR_BUCKET_NAME> and <YOUR_TABLE_NAME> fields with the values from your setup:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "myS3Permissions", "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::<YOUR_BUCKET_NAME>/*" ] }, { "Sid": "myLogsPermissions", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "*" ] }, { "Sid": "myDynamodbPermissions", "Effect": "Allow", "Action": [ "dynamodb:PutItem" ], "Resource": [ "arn:aws:dynamodb:us-east-1:01234567890:table/<YOUR_TABLE_NAME>" ] } ] }
With the function created and all set, the final step is to configure the trigger for the function. Make sure you provide the Prefix in the trigger's configuration, as shown here:

With this step completed, test the entire setup by uploading a sample CSV file to the /csv directory. Remember the CSV file will have to be of the same tabular format as we discussed earlier at the beginning of this use case. Once the file is uploaded, check whether the contents are populated successfully in the DynamoDB table or not. You can even verify this by looking at the function's logs in CloudWatch, as shown in the following screenshot:

With this, we come toward the end of our use cases.