お久しぶりですね。実に4ヶ月も更新をサボってしまってました笑
ここのところ、プロジェクトが2、3同時に対応することが多くて、
ナレッジは溜まっているもののなかなかブログにかけずにおりました。。。
さて、本日はpandasでto_jsonでjsonにしたものをgzipに圧縮しようと
したらうまくいかなかったよーというお話です。
環境
今回の事象が発生した環境は、
awsのEMRにてpysparkでstepを実行し、その結果ファイルを出力するときに
ファイル名を指定できるようpandasにてto_jsonでs3へ出力。
その際にgzipで圧縮するという仕様です。
ところが、単純そうに見えるこの処理がうまくいかないんですよね。
元のソースコード
# sparksession生成
with SparkSession.builder.appName(
"create_mass_contract_forecast_data"
).getOrCreate() as spark:
# ファイル読み込み先パス
input_file_path = (
f"s3://ファイルパス"
)
# input csv -> DataFrame
df = spark.read_csv(input_file_path)
# do something。。。
# spark.DaraFrame -> pandas.DataFrame
pd_df = df.toPandas()
pd_df.to_json(
f"s3://出力ファイル名.json.gz",
orient="values",
compression="gzip",
)
sparkにてs3からcsvを読込み、何らかの処理を施した上で、
pandasのDataFrameへ変換し、to_jsonでgzip圧縮して出力するという感じです。
この処理がうまくいかなかったのですが、どうなったかというと、
エラーになったりせず出力ファイル名がgzではあるものの、
ファイル自体は圧縮されておらず、jsonのままになっていました。
原因と改善策
原因
原因はこちらで見つけたのですが、どうやらpandas内部で
s3への書き込みに使用しているs3fsが、stream書き込みを使用するときに
compressionのパラメータが有効ではないためのようです。
上記で見つけたサイトには、to_csvの場合で記載されていますが、
これはto_jsonでも一緒ですね。
解決したソースコード
ということで解決策としては、紹介したサイトにもある通り、
to_jsonの書き込みをgzipファイルのバイナリとして書き込み、
それが完了してからオブジェクトとしてboto3でs3へアップロードという流れになります。
buffer = BytesIO()
with gzip.GzipFile(mode="w", fileobj=buffer) as zipped_file:
pd_df.to_json(TextIOWrapper(zipped_file, "utf8"), orient="values")
s3_resource = boto3.resource("s3")
s3_object = s3_resource.Object(
"バケット名",
"出力ファイル名.json.gz",
)
s3_object.put(Body=buffer.getvalue())
「エラーにならない」というのが、なかなかにハマリポイントでした。。。

20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!



コメント