【EMR】【pyspark】postgresqlのデータをDataframeに読み込む

Apache Spark
スポンサーリンク

以前、別のプロジェクトでscalaの方のsparkで

EMRを利用した分散処理基盤の実装というのは行っていました。

 

 

今回はpysparkを使用することになったのですが、

いろいろspark時代の知識は有効であったものの、

postgresqlのようなDBを使用するところで

ものすごくハマってしまいました・・・。

 

 

 

 

 

 

 

サンプルコード

今回は単純なjdbcによるクエリ発行で

Dataframeを作成しています。

queryという変数には実行したいSelect文が格納されている感じです。

 

temp_df= (
            spark.read.format("jdbc")
            .option("driver", "org.postgresql.Driver")
            .option(
                "url",
                "jdbc:postgresql://" + db_host + ":" + db_port + "/db_name",
            )
            .option("user", db_user)
            .option("password", db_pass)
            .option("query", query)
            .load()
        )

 

 

エラー内容は・・・

今回うまくいかなかったのは、上記のサンプルを実行したときに

java.lang.ClassNotFoundException: org.postgresql.Driver

 

jdbcドライバがみつからないというエラーですね。

pysparkなのにjava?とは思いました笑

 

 

今回ハマったポイント

pysparkといっても、pythonがjavaと連携しているだけ(py4j)で、

根っこの部分はjavaで動いているようです。

そうすると、DB接続部分ではpostgresqlのDriver.jarを使用することに

なりますが、そのDriverのjarをexecutorに配布することができなかったのですね。

 

 

scalaのときにはsbtにてsparkアプリケーションに依存ライブラリを

同梱していたので、この問題は当時発生していなかったのです。

 

 

 

 

サイトで検索してみたら・・・

spark-submitのコマンドの引数として、

「–driver-class-path」と「–jars」を指定したらいいよというのは

とてもたくさんヒットしました。

 

 

が、今回はこの2つの引数を設定すると、

java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem

 

このエラーが発生しステップエラーとなってしまいました。

 

 

解決策

sparkのリファレンスを見てみると、

 

spark-submitを使う場合、--jars オプションを使って含まれる全てのjarと一緒にアプリケーションのjarが自動的にクラスタに転送されるでしょう。--jarsの後に渡されるURLはカンマで区切られてなければなりません。そのリストはドライバーとexecutorのクラスパス上に含まれます。

 

と書いてあったので、

–driver-class-pathの引数を外して、–jarsのみにしてみました。

また、ここで指定するjarはS3に格納されているものになります。

 

 

これでめでたく動作させることができました。

 

 

参考)sparkアプリケーション側のsparkセッション作成

上記の対応ができていれば、sparkアプリケーション側は以下のように

セッション作成するだけで問題ありません。

with SparkSession.builder.appName("sample_app").getOrCreate() as spark:

 

 

ですが、ローカルでアプリケーションだけ単体で動かす場合には、

以下のように記載することで動作させることができます。

jdbc_driver_pathにはローカルの絶対パスを指定すればよいです。

ご参考まで。

with SparkSession.builder.appName("sample_app").config("spark.jars", jdbc_driver_path).getOrCreate() as spark:

 

 

それでは!!

 

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました