Send recognition results to Elasticsearch

In this step, you will use AWS Lambda to send the recognition results to Amazon Elasticsearch Service to visualize the results of the facial recognition.

Creating AWS Lambda

Create an AWS Lambda function to send the results of the facial recognition from Amazon Kinesis Data Streams to the Amazon Elasticsearch Service.

  • Open AWS Lambda Console
  • Select Functions from the left menu and click on Create function on the right
  • Enter the following information and click Create function
    • Select Author from scratch
    • Function name: kvs-workshop-lambda
    • Runtime: Python 3.8
    • Execution role: Create a new role from AWS policy templates
      • Role name: kvs-workshop-lambda-role
      • Policy Templates: Select the following policy
        • Elasticsearch permissions

A Lambda function is created. Leave this browser tab open and proceed to the next step in another tab.

Setting up Amazon Elasticsearch Service

  • Open Amazon Elasticsearch Service Console
  • Select Dashboard from the left menu and click Create a new domain
  • Set up the following settings
    • Step 1: Choose deployment type
      • Selecting a Deployment Type: Development and Testing
      • Elasticsearch Version: 7.4
      • Click on Next
    • Step 2: Configure domain
      • Elasticsearch domain name: kvs-workshop-domain
      • Data nodes
        • Instance type: t2.small.elasticseasrch
      • Leave the setting as default except for the above and click on the next button
    • Step 3: Configure access and security
      • Network Configuration: Public access
      • Access Policy: Add two policies from the Custom access policy
        • Policy 1
          • Select Type: IPv4 address
          • Enter Principal: Enter the IPv4 address of your PC (You can confirm it on IP address checker)
          • Select Action: Allow
        • Policy 2
          • Select Type: IAM ARN
          • Enter Principal: arn:aws:iam::account ID:role/service-role/kvs-workshop-lambda-role
          • Select Action: Allow
      • Leave the setting as default except for the above and click on the next button
    • Step 4: Review
      • Confirm that all the settings are correct and click on Confirm

It takes about 10 minutes for the Amazon Elasticsearch Service domain to be created. Wait until the domain status becomes Active.

When the domain status changes, copy the URL of the Endpoint in the overview tab (e.g. https://search-kvs-workshop-domain-xxxx.your-region.es.amazonaws.com)。

Uploading code to AWS Lambda

Download lambda.zip from the following URL.

Now go back to the Lambda tab in your browser and follow the steps below in the Function code of the Lambda function.

  • Click the Actions > upload a .zip file in the upper right
  • Select the lambda.zip downloaded above
  • Click on Save

The source code of the Lambda function you uploaded here is as follows.

import base64
import datetime
import json
import logging
import os
import uuid

import boto3
import requests
from requests_aws4auth import AWS4Auth


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Get Elasticsearch service settings from environmental variables
es_url = os.getenv("ES_URL")
region = os.getenv("REGION")

# Get credentials
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 'es', session_token=credentials.token)


def process_record(record):
    """ Submit face recognition result to Elasticsearch service """
    payload = json.loads(base64.b64decode(record['kinesis']['data']))
    logger.info(f"record: {payload}")

    unix_time = payload["InputInformation"]["KinesisVideo"]["ServerTimestamp"]
    timestamp = datetime.datetime.utcfromtimestamp(unix_time).strftime("%Y-%m-%dT%H:%M:%S+0000")

    for face in payload["FaceSearchResponse"]:
        if not face["MatchedFaces"]:
            continue
        confidence = face["MatchedFaces"][0]["Similarity"]
        name = face["MatchedFaces"][0]["Face"]["ExternalImageId"]

        data = {"timestamp": timestamp, "name": name, "confidence": confidence}
        response = requests.post(f"{es_url}/record/face/{uuid.uuid4()}",
                                 auth=aws_auth,
                                 headers={"Content-Type": "application/json"},
                                 data=json.dumps(data))
        logger.info(f"result: code={response.status_code}, response={response.text}")


def lambda_handler(event, context):
    for record in event['Records']:
        process_record(record)
    return {"result": "ok"}

Configuring Environmental Variable of AWS Lambda

Next, go back to the Lambda tab to set the environment variables for the Lambda function.

  • Click on Environment variables > Edit at the bottom of the screen
  • Click on Add environment variable
  • Enter the following two environment variables and click Save
    • Environment variable 1
      • Key: ES_URL
      • Value: The URL you just copied
    • Environment variable 2
      • Key: REGION
      • Value: The region in use (e.g. ap-northeast-1)

Configuring AWS Lambda Permissions and Trigger

Set up the IAM role.

  • Open the Permissions tab at the top of the screen in the console of the Lambda function
  • Click Execute role > Role name > kvs-workshop-lambda-role
  • Open the Permissions tab and click Attach policies in the new tab on the IAM roles page
  • Search for kinesis in the Filter policies and check the box of AmazonKinesisFullAccess
  • Click the Attach policy at the bottom of the screen
  • Close the tab and return to the original Lambda function tab

Next, configure Lambda function to be triggered when records are put to Amazon Kinesis Data Streams.

  • Open the Configuration tab at the top of the screen
  • Click the + Add trigger in Designer at the top of the screen
  • Trigger configuration: Select Kinesis
  • Enter the following information and click Add
    • Kinesis stream: kvs-workshop-data-stream
    • Consumer: No consumer
    • Batch size: 10
    • Starting position: Latest
    • Additional Settings > Retry attempts: 1
    • Confirm Enable trigger is checked

That’s it, the face recognition results will be sent to Amazon Elasticsearch Service. In the next step, you will use Kibana to analyze the results.