[boto3][Dynamo][Glue][Athena] DynamoDBをS3にエクスポートしてAthenaでクエリする

boto3
スポンサーリンク

DynamoDBはよく利用するけど、キャパシティが気になって

デバッグがやりにくいなと思っていましたが、

DynamoDBには、キャパシティを消費せずS3へエクスポートする機能がありました。

 

 

そしてもちろんそれはboto3からも実行可能でした。

ということは。。。

boto3で

 

DynamoDB → S3 → Glueクローラ〜テーブル作成 → Athenaでクエリ

 

までの一連の流れがプログラムで自動でできるのでは?と思い立って、

実際にプログラムを書いてみました。

 

 

実際に使ってみて、これは便利すぎる!!となり

自分用の良いツールができたな!と思っています。

 

 

 

 

 

 

プログラム掲載

プログラム内にコメントで解説をいれてあるのでご参照いただければと思います。

 

import boto3
import time
import io
import csv
import arrow

dynamodb = boto3.client("dynamodb", region_name="ap-northeast-1")
glue = boto3.client("glue")
s3 = boto3.client("s3")
athena = boto3.client("athena")

table_arn = "[エクスポート対象のDynamoDBテーブルのarn]"
backet_name = "[エクスポートするs3バケット名]"
database_name = "[Glueクローラでテーブル作成する対象のデータベース]"
s3_prefix = f"export/{arrow.now().format('YYYYMMDDHHmmSS')}"
table_prefix = "hoge_"
crawler_name = "test_crawler"


def main():
    # S3にエクスポート
    export_s3()

    # クローラ実行パス取得
    s3_path = get_crawler_path()

    # クローラ作成
    create_crawler(s3_path)

    # クローラ実行
    execute_crawler()

    # athenaクエリ実行
    query_execution_id = query_athena()

    # s3 中身チェック
    check_csv(query_execution_id)

    # お掃除(Glueクローラ、テーブル削除)
    glue.delete_crawler(Name=crawler_name)
    glue.delete_table(DatabaseName=database_name, Name=f"{table_prefix}data")


def export_s3():
    # point_in_timeを指定もできるけど、この場合は現在の状態をエクスポートする
    response = dynamodb.export_table_to_point_in_time(
        TableArn=table_arn,
        S3Bucket=backet_name,
        S3Prefix=s3_prefix,
    )

    # ステータスチェックループ 'ExportStatus': 'IN_PROGRESS'|'COMPLETED'|'FAILED'
    export_arn = response["ExportDescription"]["ExportArn"]

    # エクスポート完了まで待機
    while True:
        time.sleep(60)
        response = dynamodb.describe_export(ExportArn=export_arn)
        status = response["ExportDescription"]["ExportStatus"]

        if status == "COMPLETED":
            break

    print("backup completed")


# クローラを実行する時に指定するパスを取得する
def get_crawler_path():

    # エクスポートするとバケット/指定したprefix/AWSDynamoDB/dataの配下にデータが出力される
    response = s3.list_objects(
        Bucket=backet_name, Prefix=f"{s3_prefix}/AWSDynamoDB/", Delimiter="/"
    )
    return f"{response['CommonPrefixes'][0]['Prefix']}data"

# Glueクローラ作成
def create_crawler(s3_path):
    glue.create_crawler(
        Name=crawler_name,
        Role="evass-glue_FSRole",
        DatabaseName=database_name,
        TablePrefix=table_prefix,
        Targets={
            "S3Targets": [
                {
                    "Path": f"s3://{backet_name}/{s3_path}",
                },
            ]
        },
    )
    print("crawler create completed")


# クローラ実行
def execute_crawler():
    response = glue.start_crawler(Name=crawler_name)

    # ステータスチェック
    while True:
        response = glue.get_crawler(Name=crawler_name)

        # READYになったらクローラ実行が終わっている
        if response["Crawler"]["State"] == "READY":
            break

        time.sleep(30)

# athenaでクエリ実行
def query_athena():

    # DynamoDBのエクスポートファイルでGlueテーブルを作成すると
    # カラムは構造体になる item.[カラム名].Sなどと指定してselectする
    query = f"select * from {database_name}.hoge_data;"
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": database_name},
        ResultConfiguration={
            "OutputLocation": f"s3://{backet_name}/{s3_prefix}/output",
        },
    )
    query_execution_id = response["QueryExecutionId"]

    # クエリ ステータスチェック
    while True:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)

        if response["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
            return query_execution_id

        time.sleep(1)


# クエリ結果チェック
def check_csv(query_execution_id):
    
    # クエリの結果はcsvで出力されている
    s3 = boto3.resource("s3")
    s3obj = s3.Object(backet_name, f"{s3_prefix}/output/{query_execution_id}.csv").get()

    byte_data = io.TextIOWrapper(io.BytesIO(s3obj["Body"].read()))

    reader = csv.reader(byte_data)
    it = (row for row in reader)

    # ヘッダ読み飛ばす
    next(it)

    for data in it:
        print(data)


if __name__ == "__main__":
    main()

 

 

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました