認識結果を Elasticsearch へ送信

このステップでは、顔認識した結果を可視化する下準備として、AWS Lambda を利用して Amazon Elasticsearch Service へ認識結果を送信します。

AWS Lambda の作成

顔認識した結果を Amazon Kinesis Data Streams から Amazon Elasticsearch Service に送るための AWS Lambda を作成します。

  • AWS Lambda のコンソール を開きます
  • 左側のメニューの 関数 を選択し、右側の 関数の作成 をクリックします
  • 以下の情報を入力して、 関数の作成 をクリックします
    • 一から作成
    • 関数名: kvs-workshop-lambda
    • ランタイム: Python 3.8
    • 実行ロール: AWSポリシーテンプレートから新しいロールを作成
      • ロール名: kvs-workshop-lambda-role
      • ポリシーテンプレート: 以下のポリシーを選択
        • Elasticsearch のアクセス権限

以上で Lambda 関数が作成されました。このタブは開いたままにして、別のタブで次の手順を進めます。

Amazon Elasticsearch Service のセットアップ

  • Amazon Elasticsearch Service のコンソール を開きます
  • 左側のメニューの ダッシュボード を選択し、新しいドメインの作成 をクリックします
  • 以下の設定を行なっていきます
    • Step 1: デプロイタイプの選択
      • デプロイタイプの選択: 開発およびテスト
      • Elasticsearch のバージョン: 7.4
      • 次へ をクリック
    • Step 2: ドメインの設定
      • Elasticsearch ドメイン名: kvs-workshop-domain
      • データノード
        • インスタンスタイプ: t2.small.elasticseasrch
      • 上記以外はデフォルト設定のまま 次へ をクリック
    • Step 3: アクセスとセキュリティの設定
      • ネットワーク構成: パブリックアクセス
      • アクセスポリシー: カスタムアクセスポリシー よりポリシーを2つ追加します
        • 1つ目
          • タイプを選択: IPv4アドレス
          • プリンシパルを入力: IPアドレスチェッカーで調べた、操作用 PC の IPv4 アドレスを入力 (例: 123.1.23.45)
          • アクションを選択: 許可
        • 2つ目
          • タイプを選択: IAM ARN
          • プリンシパルを入力: arn:aws:iam::アカウントID:role/service-role/kvs-workshop-lambda-role
            • 上記の アカウントID の部分は、利用している AWS のアカウントIDに置き換えます
            • アカウントIDは アカウント設定 のページで確認できます
          • アクションを選択: 許可
      • 上記以外はデフォルト設定のまま 次へ をクリック
    • Step 4: 確認
      • 設定内容に問題がなければ 確認 をクリック

Amazon Elasticsearch Service のドメインが作成されるまで10分程度かかります。ドメインのステータスが アクティブ になるまで待ちましょう。

ドメインのステータスが変わったら、概要タブの エンドポイント の URL をコピーしておきます (例: https://search-kvs-workshop-domain-xxxx.your-region.es.amazonaws.com)。

AWS Lambda の設定

続いて、 Lambda 関数に環境変数を設定するために、Lambda のタブに戻ります。

  • 画面下部の 環境変数 > 環境変数を管理 をクリック
  • 環境変数の追加 をクリック
  • 以下の2つの環境変数を入力し、保存 をクリック
    • 1つ目
      • キー: ES_URL
      • : 先ほどコピーした URL
    • 2つ目
      • キー: REGION
      • : 利用しているリージョン (例: ap-northeast-1)

AWS Lambda にコードをアップロード

操作用 PC のお好きな場所にフォルダを作成し、以下のコードを lambda_function.py として保存します。

import base64
import datetime
import json
import logging
import os
import uuid

import boto3
import requests
from requests_aws4auth import AWS4Auth


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Get Elasticsearch service settings from environmental variables
es_url = os.getenv("ES_URL")
region = os.getenv("REGION")

# Get credentials
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 'es', session_token=credentials.token)


def process_record(record):
    """ Submit face recognition result to Elasticsearch service """
    payload = json.loads(base64.b64decode(record['kinesis']['data']))
    logger.info(f"record: {payload}")

    unix_time = payload["InputInformation"]["KinesisVideo"]["ServerTimestamp"]
    timestamp = datetime.datetime.utcfromtimestamp(unix_time).strftime("%Y-%m-%dT%H:%M:%S+0000")

    for face in payload["FaceSearchResponse"]:
        if not face["MatchedFaces"]:
            continue
        confidence = face["MatchedFaces"][0]["Similarity"]
        name = face["MatchedFaces"][0]["Face"]["ExternalImageId"]

        data = {"timestamp": timestamp, "name": name, "confidence": confidence}
        response = requests.post(f"{es_url}/record/face/{uuid.uuid4()}",
                                 auth=aws_auth,
                                 headers={"Content-Type": "application/json"},
                                 data=json.dumps(data))
        logger.info(f"result: code={response.status_code}, response={response.text}")


def lambda_handler(event, context):
    for record in event['Records']:
        process_record(record)
    return {"result": "ok"}

操作用 PC のターミナルを開き、lambda_function.py を作成したフォルダに移動し、以下のコマンドを実行します。

cd 作成したフォルダ
pip3 install requests-aws4auth -t .
zip lambda.zip -r ./

zip コマンドが利用できない場合は、エクスプローラーなどから作成したフォルダを ZIP 形式で圧縮します。

ブラウザに戻って、Lambda 関数のコンソールの 関数コード で以下の操作をします。

  • コードエントリタイプ: .zip ファイルをアップロード
  • 関数パッケージ: 作成したフォルダの lambda.zip を選択
  • 画面右上の 保存 をクリック

AWS Lambda の実行設定

IAM ロールの設定をします。

  • Lambda 関数のコンソールの画面上部の アクセス権限 のタブを開きます
  • 実行ロール > ロール名 > kvs-workshop-role をクリックします
  • 新しいタブで IAM ロールのページが開くので、アクセス権限 のタブを開き、ポリシーをアタッチします をクリックします
  • ポリシーのフィルタで kinesis を検索し、AmazonKinesisFullAccess にチェックを入れます
  • 画面下部の ポリシーのアタッチ をクリックします
  • タブを閉じ、元の Lambda 関数の画面に戻ります

続いて、Amazon Kinesis Data Streams へレコードが追加された際に、Lambda がトリガーされるようにします。

  • 画面上部の 設定 のタブを開きます
  • 画面上部の Designer の + トリガーを追加 をクリックします
  • トリガーの設定: Kinesis を選択します
  • 以下の内容を入力して 追加 をクリックします
    • Kinesis ストリーム: kvs-workshop-data-stream
    • コンシューマー: コンシューマーなし
    • バッチサイズ: 10
    • 開始位置: 最新
    • 追加設定 > 再試行: 1
    • トリガーの有効化: チェックを入れます

以上で、顔認識結果を Amazon Elasticsearch Service に登録することができました。 次のステップでは、登録された認識結果を分析するため、 Kibana を用いて可視化していきます。