サイトアイコン たーちゃんの「ゼロよりはいくらかましな」

【boto3】【dynamo】batch_get_itemでリトライ取得

boto3のDynamoDBはbatch_get_itemは1回で取得できるデータに

制限があります。(データが16MB or 項目が100まで)

 

 

取得できなかったデータは、UnprocessKeysを使用して

もう一度取得することで不足分を追加で取得することができます。

 

 

そのあたりのプログラムを実装したのでメモ。

 

 

 

 

コード

今回は対象テーブルが1つという前提で実装してみました。

def batch_get_item_with_retry(
    table_name: str, key_list: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
    """batch_get_item unprocessed_keyによるretryを含めデータ取得

    :param table_name: 対象テーブル名
    :param key_list: キーリスト
    :return: 取得データ
    """
    tries = 0
    max_tries = 5
    dynamo_table_data = []
    batch_keys = {table_name: {"Keys": key_list}}
    sleep_time = 0.5

    while tries < max_tries:
        batch_get_item_response = dynamodb.batch_get_item(
            RequestItems=batch_keys
        )

        dynamo_table_data += batch_get_item_response["Responses"].get(
            table_name, []
        )

        unprocessed_key = batch_get_item_response["UnprocessedKeys"]
        if unprocessed_key:
            batch_keys = unprocessed_key
            tries += 1
            if tries < max_tries:
                time.sleep(sleep_time)

                # 指数関数的バックオフ
                sleep_time = min(sleep_time * 2, 8)
            else:
                # 最大試行回数を超えた場合、エラーとする
                raise NotAllDynamoDataException()
        else:
            break

    return dynamo_table_data


class NotAllDynamoDataException(Exception):
    """DynamoDBからbatch_get_itemによってデータを取得しきれなかった"""

    pass

 

 

解説

引数に渡すkey_listには以下のようなデータで渡します。

key_list = [
    {
        "カラム名1": データ1,
        "カラム名2": データ2,
    },
    {
        "カラム名1": データ3,
        "カラム名2": データ4,
    }
]

 

dynamo_table_dataを返却する結果セットとして、

まずResponsesの内容を追加します。

UnprocessedKeysに値が存在する場合は、1回で取得出来なかったデータがあるので、

格納されたデータがそのままキーとして使用できるのですが、

すぐに実行してもDynamo側のキャパシティの問題ですぐに結果が取得できないことがあるので、指数関数的バックオフの考え方でsleepさせます。

 

指数関数的バックオフはプログラムでのリトライのときに、

次のリトライまでの時間を指数関数的に伸ばしていくものです。

Dynamoへのリトライには最適な考え方ですね。

 

 

とはいえ、最大試行回数は決めておく必要があるので、

最大試行回数を越えた場合は、今回は自前のExceptionをraiseすることにしています。

Exceptionの名前で何が起きたのかわかりやすくするという意図ですね。

 

 

最大試行回数内ですべてのデータが取得出来た場合は、

データが返却されることになります。

 

 

 

 

 


にほんブログ村


人気ブログランキング

モバイルバージョンを終了