サイトアイコン たーちゃんの「ゼロよりはいくらかましな」

【Apache Spark】jdbc並列読み込みさせる

sparkでjdbc経由でデータベースの内容を

読み込んでDataFrameとして扱うことは少なくないと思います。

 

しかし大量データを取得するような場合は、

1クエリでデータを取得するより、

並列で取得したほうが効率がよいことがあります。

 

今回は僕が対応した内容をご紹介したいと思います。

データベースはMySqlです。

もちろん、今回紹介させていただいた方法以外にも

並列処理する方法はありますので、

ご興味のある方は調べてみるのも良いかと思います。

 

 

 

 

 

 

 

 

並列化させず取得するパターン

並列化させずにjdbc接続で取得しようとすると、

以下のようになります。

 

val sampleQuery = "" +
	"select " +
	"    a.col1, " +
	"    a.col2, " +
	"    a.col3 " +
	"from table_a a "

val sampleDF = spark.read
	.format("jdbc")
	.option("driver", <jdbcドライバ名>)
	.option("url", <jdbcURL>)
	.option("user",<ユーザ名>)
	.option("password", <パスワード>)
	.option("query", sampleQuery)
	.load()

 

ただ、この場合はちょっと弊害があることがあります。

DataFrameのパーティションが1になってしまうので、

大量データを処理する場合にはものすごい時間がかかってしまいます。

 

 

 

 

 

並列化させるパターン

そこでクエリにちょっと手を入れてあげて、

並列化させて読み込むために以下のようにしてみました。

 

val sampleQuery = "" +
	"select " +
	"    @i:=@i+1 as rownum, " +
	"    a.col1, " +
	"    a.col2, " +
	"    a.col3 " +
	"from (SELECT @i:=0) AS INDEX_NUM, table_a a " +

val sampleDF = spark.read
	.format("jdbc")
	.option("driver", <jdbcドライバ名>)
	.option("url", <jdbcURL>)
	.option("user",<ユーザ名>)
	.option("password", <パスワード>)
	.option("dbtable", "(" + sampleQuery + ") as subq")
	.option("partitionColumn", "rownum")
	.option("lowerBound", "0")
	.option("upperBound", <データ件数>)
	.option("numPartitions", <パーティション数>)
	.load()

 

まず、クエリのSelect項目にROW番号を付与するようにします。

そのあとDataFrameReaderのオプションに、

dbtableを追加します。

ここでカッコでくくっているのは、クエリそのものをテーブルとして扱うためです。

 

partitionColumnではパーティション分けする対象となるカラム。

lowerBoundでは最小データ件数

lowerBoundでは最大データ件数

※ここでは事前にSelect count(*)などで件数がわかっているといいですね。

 

numPartitionで分割したいパーティション数

をそれぞれパラメータとして渡します。

 

partitionColumn、lowerBound、lowerBound、numPartitionは

すべてセットで入力しなければエラーになりますので要注意です。

 

 

裏では実行するSQLに、

データ件数 / パーティション数で1パーティションあたりのデータ数を計算して、

limit 0 to データ件数のようにlimit句を付与しているようです。

 

また、今回紹介した方法では指定するカラムは

数値である必要がありますので要注意です。

 

それでは!

 


にほんブログ村


人気ブログランキング

モバイルバージョンを終了