boto3のDynamoDBはbatch_get_itemは1回で取得できるデータに
制限があります。(データが16MB or 項目が100まで)
取得できなかったデータは、UnprocessKeysを使用して
もう一度取得することで不足分を追加で取得することができます。
そのあたりのプログラムを実装したのでメモ。
コード
今回は対象テーブルが1つという前提で実装してみました。
def batch_get_item_with_retry(
table_name: str, key_list: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
"""batch_get_item unprocessed_keyによるretryを含めデータ取得
:param table_name: 対象テーブル名
:param key_list: キーリスト
:return: 取得データ
"""
tries = 0
max_tries = 5
dynamo_table_data = []
batch_keys = {table_name: {"Keys": key_list}}
sleep_time = 0.5
while tries < max_tries:
batch_get_item_response = dynamodb.batch_get_item(
RequestItems=batch_keys
)
dynamo_table_data += batch_get_item_response["Responses"].get(
table_name, []
)
unprocessed_key = batch_get_item_response["UnprocessedKeys"]
if unprocessed_key:
batch_keys = unprocessed_key
tries += 1
if tries < max_tries:
time.sleep(sleep_time)
# 指数関数的バックオフ
sleep_time = min(sleep_time * 2, 8)
else:
# 最大試行回数を超えた場合、エラーとする
raise NotAllDynamoDataException()
else:
break
return dynamo_table_data
class NotAllDynamoDataException(Exception):
"""DynamoDBからbatch_get_itemによってデータを取得しきれなかった"""
pass
解説
引数に渡すkey_listには以下のようなデータで渡します。
key_list = [
{
"カラム名1": データ1,
"カラム名2": データ2,
},
{
"カラム名1": データ3,
"カラム名2": データ4,
}
]
dynamo_table_dataを返却する結果セットとして、
まずResponsesの内容を追加します。
UnprocessedKeysに値が存在する場合は、1回で取得出来なかったデータがあるので、
格納されたデータがそのままキーとして使用できるのですが、
すぐに実行してもDynamo側のキャパシティの問題ですぐに結果が取得できないことがあるので、指数関数的バックオフの考え方でsleepさせます。
指数関数的バックオフはプログラムでのリトライのときに、
次のリトライまでの時間を指数関数的に伸ばしていくものです。
Dynamoへのリトライには最適な考え方ですね。
とはいえ、最大試行回数は決めておく必要があるので、
最大試行回数を越えた場合は、今回は自前のExceptionをraiseすることにしています。
Exceptionの名前で何が起きたのかわかりやすくするという意図ですね。
最大試行回数内ですべてのデータが取得出来た場合は、
データが返却されることになります。

20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!



コメント