DynamoDBはよく利用するけど、キャパシティが気になって
デバッグがやりにくいなと思っていましたが、
DynamoDBには、キャパシティを消費せずS3へエクスポートする機能がありました。
そしてもちろんそれはboto3からも実行可能でした。
ということは。。。
boto3で
DynamoDB → S3 → Glueクローラ〜テーブル作成 → Athenaでクエリ
までの一連の流れがプログラムで自動でできるのでは?と思い立って、
実際にプログラムを書いてみました。
実際に使ってみて、これは便利すぎる!!となり
自分用の良いツールができたな!と思っています。
プログラム掲載
プログラム内にコメントで解説をいれてあるのでご参照いただければと思います。
import boto3
import time
import io
import csv
import arrow
dynamodb = boto3.client("dynamodb", region_name="ap-northeast-1")
glue = boto3.client("glue")
s3 = boto3.client("s3")
athena = boto3.client("athena")
table_arn = "[エクスポート対象のDynamoDBテーブルのarn]"
backet_name = "[エクスポートするs3バケット名]"
database_name = "[Glueクローラでテーブル作成する対象のデータベース]"
s3_prefix = f"export/{arrow.now().format('YYYYMMDDHHmmSS')}"
table_prefix = "hoge_"
crawler_name = "test_crawler"
def main():
# S3にエクスポート
export_s3()
# クローラ実行パス取得
s3_path = get_crawler_path()
# クローラ作成
create_crawler(s3_path)
# クローラ実行
execute_crawler()
# athenaクエリ実行
query_execution_id = query_athena()
# s3 中身チェック
check_csv(query_execution_id)
# お掃除(Glueクローラ、テーブル削除)
glue.delete_crawler(Name=crawler_name)
glue.delete_table(DatabaseName=database_name, Name=f"{table_prefix}data")
def export_s3():
# point_in_timeを指定もできるけど、この場合は現在の状態をエクスポートする
response = dynamodb.export_table_to_point_in_time(
TableArn=table_arn,
S3Bucket=backet_name,
S3Prefix=s3_prefix,
)
# ステータスチェックループ 'ExportStatus': 'IN_PROGRESS'|'COMPLETED'|'FAILED'
export_arn = response["ExportDescription"]["ExportArn"]
# エクスポート完了まで待機
while True:
time.sleep(60)
response = dynamodb.describe_export(ExportArn=export_arn)
status = response["ExportDescription"]["ExportStatus"]
if status == "COMPLETED":
break
print("backup completed")
# クローラを実行する時に指定するパスを取得する
def get_crawler_path():
# エクスポートするとバケット/指定したprefix/AWSDynamoDB/dataの配下にデータが出力される
response = s3.list_objects(
Bucket=backet_name, Prefix=f"{s3_prefix}/AWSDynamoDB/", Delimiter="/"
)
return f"{response['CommonPrefixes'][0]['Prefix']}data"
# Glueクローラ作成
def create_crawler(s3_path):
glue.create_crawler(
Name=crawler_name,
Role="evass-glue_FSRole",
DatabaseName=database_name,
TablePrefix=table_prefix,
Targets={
"S3Targets": [
{
"Path": f"s3://{backet_name}/{s3_path}",
},
]
},
)
print("crawler create completed")
# クローラ実行
def execute_crawler():
response = glue.start_crawler(Name=crawler_name)
# ステータスチェック
while True:
response = glue.get_crawler(Name=crawler_name)
# READYになったらクローラ実行が終わっている
if response["Crawler"]["State"] == "READY":
break
time.sleep(30)
# athenaでクエリ実行
def query_athena():
# DynamoDBのエクスポートファイルでGlueテーブルを作成すると
# カラムは構造体になる item.[カラム名].Sなどと指定してselectする
query = f"select * from {database_name}.hoge_data;"
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database_name},
ResultConfiguration={
"OutputLocation": f"s3://{backet_name}/{s3_prefix}/output",
},
)
query_execution_id = response["QueryExecutionId"]
# クエリ ステータスチェック
while True:
response = athena.get_query_execution(QueryExecutionId=query_execution_id)
if response["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
return query_execution_id
time.sleep(1)
# クエリ結果チェック
def check_csv(query_execution_id):
# クエリの結果はcsvで出力されている
s3 = boto3.resource("s3")
s3obj = s3.Object(backet_name, f"{s3_prefix}/output/{query_execution_id}.csv").get()
byte_data = io.TextIOWrapper(io.BytesIO(s3obj["Body"].read()))
reader = csv.reader(byte_data)
it = (row for row in reader)
# ヘッダ読み飛ばす
next(it)
for data in it:
print(data)
if __name__ == "__main__":
main()

20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!



コメント