Ingesting enriched IoT knowledge into Amazon S3 utilizing Amazon Kinesis Information Firehose


Introduction

When sending knowledge from Web of Issues (IoT) gadgets to a knowledge lake, it’s possible you’ll want to counterpoint the system knowledge payload with further metadata within the cloud for additional knowledge processing and visualization. There are a number of causes this knowledge may not exist within the system payload, corresponding to minimizing the system payload in restricted bandwidth environments or modifying it with enterprise inputs within the cloud. For instance, a machine on the manufacturing unit flooring could be assigned to totally different operators in the course of the day. This variable enterprise knowledge could be saved in a database. In your knowledge lake, you may want this data to be saved together with the payload.

On this weblog put up, you’ll discover ways to ingest enriched IoT knowledge to a knowledge lake in close to real-time.

Stipulations

  • An AWS account
  • AWS Command Line Interface (AWS CLI). See AWS CLI fast setup for configuration.

Use case definition

Let’s assume that in your logistics firm, you could have containers outfitted with sensor-enabled IoT gadgets. When the container is loaded right into a ship, the container ID is related to the ship ID. You must retailer the IoT system payload with the ship ID in your knowledge lake.

In such a use case, the sensor payload comes from the IoT system hooked up to the container. Nonetheless, the related ship ID is simply saved within the metadata retailer. Due to this fact, the payload have to be enriched with the ship ID earlier than placing it into the info lake.

Answer structure

Architecture diagram for ingesting enriched IoT data into Amazon S3 by using Amazon Kinesis Data Firehose

Within the structure diagram,

  1. The IoT gadgets stream payloads to the AWS IoT Core message dealer to a selected MQTT matter system/knowledge/DEVICE_ID. The AWS IoT Core message dealer permits gadgets to publish and subscribe to messages through the use of supported protocols.
  2. The AWS IoT rule is triggered when there’s a payload in its matter. It’s configured with an Amazon Kinesis Information Firehose motion on this use case. You need to use AWS IoT guidelines to work together with AWS companies by calling them when there’s a message in a selected MQTT matter or straight through the use of Primary Ingest function.
  3. Amazon Kinesis Information Firehose buffers the system payloads earlier than delivering them to the info retailer primarily based on the dimensions or the time, whichever occurs first. Kinesis Information Firehose delivers real-time streaming knowledge to locations for storing or processing.
  4. As soon as the buffer hits the dimensions or the time threshold, Kinesis Information Firehose calls an AWS Lambda perform to counterpoint the system payloads in batches with the metadata retrieved from an Amazon DynamoDB AWS Lambda is a serverless compute service that runs your code for any sort of software. Amazon DynamoDB is a completely managed NoSQL database that gives quick efficiency.
  5. The enriched payloads are returned again to Kinesis Information Firehose to ship to the vacation spot.
  6. The enriched payloads are put into an Amazon Easy Storage Service (Amazon S3) bucket as a vacation spot. Amazon S3 is an object storage service which shops any quantity of information for a spread of use instances.

AWS CloudFormation template

Obtain the AWS Cloudformation template from the code repository.

The AWS CloudFormation template deploys all the mandatory sources to run this instance use case. Let’s have a more in-depth take a look at AWS IoT guidelines, Kinesis Information Firehose, and AWS Lambda perform sources.

AWS IoT guidelines useful resource

IoTToFirehoseRule:
  Kind: AWS::IoT::TopicRule
  Properties:
    TopicRulePayload:
      Actions:
        -
          Firehose:
            RoleArn: !GetAtt IoTFirehosePutRecordRole.Arn
            DeliveryStreamName: !Ref FirehoseDeliveryStream
            Separator: "n"
      AwsIotSqlVersion: ‘2016-03-23’
      Description: This rule logs IoT payloads to S3 Bucket by aggregating in Kinesis Firehose.
      RuleDisabled: false
      Sql: !Ref IotKinesisRuleSQL

The AWS IoT rule takes a SQL parameter which defines the IoT matter to set off the rule and knowledge to extract from the payload.

  • Within the instance, the SQL parameter is about to SELECT *, matter(3) as containerId FROM ‘system/knowledge/+’ by default. SELECT * means the entire payload is taken as it’s and containerId is generated from the second merchandise within the MQTT matter and included to the payload.
  • FROM ‘system/knowledge/+’ describes the IoT matter that can set off the AWS IoT rule. + is a wildcard character for MQTT subjects and the IoT gadgets will publish knowledge payloads to system/knowledge/DEVICE_ID matter to set off this rule.

The AWS IoT rule additionally defines actions. Within the instance, you’ll be able to see a Kinesis Information Firehose motion which defines the goal Kinesis Information Firehose supply stream and the AWS Identification and Entry Administration (IAM) function wanted to place information into this supply stream. A separator may be chosen to separate every report, within the given instance it’s a new line character.

Kinesis Information Firehose supply stream useful resource

FirehoseDeliveryStream:
  Kind: AWS::KinesisFirehose::DeliveryStream
  Properties:
    ExtendedS3DestinationConfiguration:
      BucketARN: !GetAtt IoTLogBucket.Arn
      BufferingHints:
        IntervalInSeconds: 60
        SizeInMBs: 1
      Prefix: device-data/
      RoleARN: !GetAtt FirehosePutS3Role.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Kind: Lambda
             Parameters:
               - ParameterName: LambdaArn
                  ParameterValue: !Sub '${FirehoseTransformLambda.Arn}:$LATEST'
               - ParameterName: RoleArn
                  ParameterValue: !GetAtt FirehoseLambdaInvokeRole.Arn

Kinesis Information Firehose supply stream should outline a vacation spot to place the stream into. It helps several types of locations. You could find the obtainable vacation spot varieties and their utilization on this documentation. On this instance, you will use Amazon S3 because the vacation spot.

The instance Supply Stream useful resource defines the next properties:

  • BucketARN: the vacation spot bucket which is able to retailer the aggregated knowledge. The vacation spot bucket is created by the CloudFormation stack.
  • BufferingHints: the dimensions and time threshold for knowledge buffering. On this instance, they’re set to 1MB and 60 seconds respectively to see the outcomes sooner. It may be adjusted in accordance with the enterprise wants. Preserving these thresholds low will trigger the Lambda perform to be invoked extra steadily. If the thresholds are excessive, the info might be ingested to the info retailer much less steadily, due to this fact, it can take time to see the newest knowledge within the knowledge retailer.
  • Prefix: the created objects might be put below this prefix. Kinesis Information Firehose partitions the info primarily based on the timestamp by default. On this instance, the objects might be put below the device-data/YYYY/MM/dd/HH folder. Kinesis Information Firehose has superior options for knowledge partitioning corresponding to dynamic partitioning. The partitioning of the info is vital when querying the info lake. For instance, if it’s worthwhile to question the info per system foundation through the use of Amazon Athena, scanning solely the partition of the related system ID will considerably cut back the scan time and the price. You could find particulars on partitioning on this documentation.
  • RoleARN: that is the IAM function that provides PutObject permission to Kinesis Information Firehose to have the ability to put aggregated knowledge into the Amazon S3 bucket.
  • ProcessingConfiguration: As described within the use case, a rework Lambda perform will enrich the IoT knowledge with the metadata. Processing Configuration defines the processor which is a Lambda perform within the instance. For every batch of information, Kinesis Information Firehose will name this Lambda perform for the transformation of the info. You may learn extra about knowledge processing on this documentation.

Transformation Lambda Operate

As you’ll be able to see within the following Python code, Kinesis Information Firehose returns a batch of information the place every report is a payload from the IoT gadgets. First, the base64 encoded payload knowledge is decoded. Then, the corresponding ship ID comes from the DynamoDB desk primarily based on the container ID. The payload is enriched with the ship ID and encoded again to base64. Lastly, the report listing is returned again to Kinesis Information Firehose.

As soon as Kinesis Information Firehose receives the information, it places them as an aggregated file into the Amazon S3 bucket.

import os
import boto3
import json
import base64

dynamodb = boto3.useful resource('dynamodb')
desk = dynamodb.Desk(os.environ['METADATA_TABLE'])
information = []

def function_handler(occasion, context):
  for report in occasion["records"]:
    # Get knowledge subject of the report in json format. It's a base64 encoded string.
    json_data = json.masses(base64.b64decode(report["data"]))
    container_id = json_data["containerId"]

    # Get corresponding shipId from the DynamoDB desk
    res = desk.get_item(Key={'containerId': container_id})
    ddb_item = res["Item"]
    ship_id = ddb_item["shipId"]

    # Append shipId to the precise report knowledge
    enriched_data = json_data
    enriched_data["shipId"] = ship_id

    # Encode the enriched report to base64
    json_string = json.dumps(enriched_data).encode("ascii")
    b64_encoded_data = base64.b64encode(json_string).decode("ascii")

    # Create a report with enriched knowledge and return again to Firehose
    rec = {'recordId': report["recordId"], 'outcome': 'Okay', 'knowledge': b64_encoded_data}
    information.append(rec)
  return {'information': information}

Deployment

Run the next command in a terminal to deploy the stack.

aws cloudformation deploy --stack-name IoTKinesisDataPath --template-file IoTKinesisDataPath.yml --parameter-overrides IotKinesisRuleSQL="SELECT *, matter(3) as containerId FROM 'system/knowledge/+'" --capabilities CAPABILITY_NAMED_IAM

After the deployment is full, run the next command in a terminal to see the output of the deployment.

aws cloudformation describe-stacks --stack-name IoTKinesisDataPath

Be aware the IoTLogS3BucketName, MetadataTableName output parameters.

Testing

After the deployment is full, very first thing it’s worthwhile to do is to create a metadata merchandise for knowledge enrichment. Run the next command to create an merchandise within the DynamoDB desk. It is going to create an merchandise with cont1 as containerId and ship1 as shipId. Exchange IoTKinesisDataPath-MetadataTable-SAMPLE parameter with the DynamoDB desk output parameter from the CloudFormation stack deployment.

aws dynamodb put-item --table-name IoTKinesisDataPath-MetadataTable-SAMPLE --item '{"containerId":{"S":"cont1"},"shipId":{"S":"ship1"}}'

In a real-life situation, the gadgets publish the payloads to a selected MQTT matter. On this instance, as a substitute of making IoT gadgets, you’ll use AWS CLI to publish payloads to MQTT subjects. Run the next command in a terminal to publish a pattern knowledge payload AWS IoT Core. Take note of the payload subject of the command, the one knowledge supplied by the system is the dynamic knowledge.

aws iot-data publish --topic "system/knowledge/cont1" --payload '{"temperature":20,"humidity":80,"latitude":0,"longitude":0}' --cli-binary-format raw-in-base64-out

Now, navigate to Amazon S3 from the AWS Administration Console and choose the bucket that has been created with the CloudFormation stack. It’s best to see the device-data folder on this bucket. It could take as much as 1 minute for the info to seem because of the buffering configuration that’s set for the Firehose supply stream. For those who navigate into the device-data/YYYY/MM/dd/HH folder, you will notice an object has been created. Go forward and open this file. You will note the content material of the file is the info payload with enriched shipId subject.

{“temperature”: 20, “humidity”: 80, “latitude”: 0, “longitude”: 0, “containerId”: “cont1”, “shipId”: “ship1”}

Troubleshooting

In case of failure within the system, the next sources may be helpful for analyzing the supply of the issue.

To observe AWS IoT Core Guidelines Engine, it’s worthwhile to allow AWS IoT Core logging. This can give detailed details about the occasions taking place in AWS IoT Core.

AWS Lambda may be monitored through the use of Amazon CloudWatch. The instance CloudFormation template has obligatory permissions to create a log group for the Lambda perform logging.

In case of failure, Kinesis Information Firehose will create a processing-failed folder below the device-data prefix within the AWS IoT Guidelines Engine motion, rework Lambda perform or Amazon S3 bucket. The small print of the failure may be learn as json objects. You could find extra data on this documentation.

Clear up

To scrub up the sources which have been created, first empty the Amazon S3 bucket. Run the next command by altering the bucket-name parameter with the title of the bucket deployed by the CloudFormation stack. Vital: this command will delete all the info contained in the bucket irreversibly.

aws s3 rm s3://bucket-name --recursive

Then, you’ll be able to delete the CloudFormation stack by operating the next command in a terminal.

aws cloudformation delete-stack --stack-name IoTKinesisDataPath

Conclusion

On this weblog, you could have discovered a standard sample of enriching IoT payloads with metadata and storing cheaply in an information lake in close to real-time through the use of AWS IoT Guidelines Engine and Amazon Kinesis Information Firehose supply stream. The proposed answer and the CloudFormation template can be utilized as a baseline for a scalable IoT knowledge ingestion structure.

You may learn additional about AWS IoT Core Guidelines Engine and Amazon Kinesis Information Firehose. Finest practices for utilizing MQTT subjects within the AWS IoT Guidelines Engine will information you to outline your matter constructions.

Ozan Cihangir

Ozan Cihangir

Ozan is a Prototyping Engineer at AWS. He helps clients to construct progressive options for his or her rising know-how tasks within the cloud. LinkedIn

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles