【EMR】【pyspark】pandasのs3へのto_jsonがgzip圧縮されない?

IT
スポンサーリンク

お久しぶりですね。実に4ヶ月も更新をサボってしまってました笑

ここのところ、プロジェクトが2、3同時に対応することが多くて、

ナレッジは溜まっているもののなかなかブログにかけずにおりました。。。

 

さて、本日はpandasでto_jsonでjsonにしたものをgzipに圧縮しようと

したらうまくいかなかったよーというお話です。

 

 

 

 

環境

今回の事象が発生した環境は、

awsのEMRにてpysparkでstepを実行し、その結果ファイルを出力するときに

ファイル名を指定できるようpandasにてto_jsonでs3へ出力。

その際にgzipで圧縮するという仕様です。

 

ところが、単純そうに見えるこの処理がうまくいかないんですよね。

 

元のソースコード

    # sparksession生成
    with SparkSession.builder.appName(
        "create_mass_contract_forecast_data"
    ).getOrCreate() as spark:

        # ファイル読み込み先パス
        input_file_path = (
            f"s3://ファイルパス"
        )

        # input csv -> DataFrame
        df = spark.read_csv(input_file_path)

        # do something。。。

        # spark.DaraFrame -> pandas.DataFrame
	pd_df  = df.toPandas()

	pd_df.to_json(
	    f"s3://出力ファイル名.json.gz",
	    orient="values",
	    compression="gzip",
        )

 

sparkにてs3からcsvを読込み、何らかの処理を施した上で、

pandasのDataFrameへ変換し、to_jsonでgzip圧縮して出力するという感じです。

 

この処理がうまくいかなかったのですが、どうなったかというと、

エラーになったりせず出力ファイル名がgzではあるものの、

ファイル自体は圧縮されておらず、jsonのままになっていました。

 

 

原因と改善策

原因

原因はこちらで見つけたのですが、どうやらpandas内部で

s3への書き込みに使用しているs3fsが、stream書き込みを使用するときに

compressionのパラメータが有効ではないためのようです。

上記で見つけたサイトには、to_csvの場合で記載されていますが、

これはto_jsonでも一緒ですね。

 

解決したソースコード

ということで解決策としては、紹介したサイトにもある通り、

to_jsonの書き込みをgzipファイルのバイナリとして書き込み、

それが完了してからオブジェクトとしてboto3でs3へアップロードという流れになります。

 

	buffer = BytesIO()

	with gzip.GzipFile(mode="w", fileobj=buffer) as zipped_file:
	    pd_df.to_json(TextIOWrapper(zipped_file, "utf8"), orient="values")

	s3_resource = boto3.resource("s3")
	s3_object = s3_resource.Object(
	    "バケット名",
	    "出力ファイル名.json.gz",
	)
	s3_object.put(Body=buffer.getvalue())

 

「エラーにならない」というのが、なかなかにハマリポイントでした。。。

 

 

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました