シェアする

  • このエントリーをはてなブックマークに追加





【Apache Spark】jdbc並列読み込みさせる

シェアする

  • このエントリーをはてなブックマークに追加
  • 0

sparkでjdbc経由でデータベースの内容を

読み込んでDataFrameとして扱うことは少なくないと思います。

しかし大量データを取得するような場合は、

1クエリでデータを取得するより、

並列で取得したほうが効率がよいことがあります。

今回は僕が対応した内容をご紹介したいと思います。

データベースはMySqlです。

もちろん、今回紹介させていただいた方法以外にも

並列処理する方法はありますので、

ご興味のある方は調べてみるのも良いかと思います。

並列化させず取得するパターン

並列化させずにjdbc接続で取得しようとすると、

以下のようになります。

val sampleQuery = "" +
	"select " +
	"    a.col1, " +
	"    a.col2, " +
	"    a.col3 " +
	"from table_a a "

val sampleDF = spark.read
	.format("jdbc")
	.option("driver", <jdbcドライバ名>)
	.option("url", <jdbcURL>)
	.option("user",<ユーザ名>)
	.option("password", <パスワード>)
	.option("query", sampleQuery)
	.load()

ただ、この場合はちょっと弊害があることがあります。

DataFrameのパーティションが1になってしまうので、

大量データを処理する場合にはものすごい時間がかかってしまいます。

並列化させるパターン

そこでクエリにちょっと手を入れてあげて、

並列化させて読み込むために以下のようにしてみました。

val sampleQuery = "" +
	"select " +
	"    @i:=@i+1 as rownum, " +
	"    a.col1, " +
	"    a.col2, " +
	"    a.col3 " +
	"from (SELECT @i:=0) AS INDEX_NUM, table_a a " +

val sampleDF = spark.read
	.format("jdbc")
	.option("driver", <jdbcドライバ名>)
	.option("url", <jdbcURL>)
	.option("user",<ユーザ名>)
	.option("password", <パスワード>)
	.option("dbtable", "(" + sampleQuery + ") as subq")
	.option("partitionColumn", "rownum")
	.option("lowerBound", "0")
	.option("upperBound", <データ件数>)
	.option("numPartitions", <パーティション数>)
	.load()

まず、クエリのSelect項目にROW番号を付与するようにします。

そのあとDataFrameReaderのオプションに、

dbtableを追加します。

ここでカッコでくくっているのは、クエリそのものをテーブルとして扱うためです。

partitionColumnではパーティション分けする対象となるカラム。

lowerBoundでは最小データ件数

lowerBoundでは最大データ件数

※ここでは事前にSelect count(*)などで件数がわかっているといいですね。

numPartitionで分割したいパーティション数

をそれぞれパラメータとして渡します。

partitionColumn、lowerBound、lowerBound、numPartitionは

すべてセットで入力しなければエラーになりますので要注意です。

裏では実行するSQLに、

データ件数 / パーティション数で1パーティションあたりのデータ数を計算して、

limit 0 to データ件数のようにlimit句を付与しているようです。

また、今回紹介した方法では指定するカラムは

数値である必要がありますので要注意です。

それでは!

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

シェアする

  • このエントリーをはてなブックマークに追加

フォローはいかがですか?