サイトアイコン たーちゃんの「ゼロよりはいくらかましな」

【Apach Spark】DataFrameのforeachでハマっちゃった

AWS EMRを使用することになり、

アプリとしてsparkを使用することになりました。

 

scalaはplay frameworkで過去にお世話になっていましたが、

saprk自体は初めて触るので、

サンプルを作成してみようかなということになりました。

 

今回はそのサンプルを作成した際に、

ハマってしまった箇所について、

お話したいと思います。

 

 

Sparkサンプルの概要

単純ですが、テキストファイルを読み込んで、DataFrame化して、

その内容を元に各文字が何回出現するかをカウントするというものです。

 

 

最初に書いたサンプル

まずはイメージで以下のように書いてみました。

ところがこれでは期待した動作をしなかったんです。

def count(df:DataFrame): mutable.Map[String, Int] = {
  val map = mutable.Map[String, Int]()

  df.foreach(data => {
    val text = data.getAs("line").toString()

    text.foreach((char) => {
      val count = map.getOrElse(char.toString, 0)

      map.update(char.toString, count + 1)
    })
  })

  map
}

 

何が起きたかというと、

returnしたいmapがsize 0のままなんです。

 

デバッグで追いかけても、

ループの途中ではmapのsizeも上がってるし、

中身もセットされているので、「???」でした。

 

解決したサンプル

かなりハマってしまったのですが、

以下にすることで解決しました。

DataFrameをforeachするのではなく、

DataFrameのcollectメソッドを一度呼んで、それをforeachすればOKでした。

def count(df:DataFrame): mutable.Map[String, Int] = {
  val map = mutable.Map[String, Int]()

  df.collect().foreach(data => {
    val text = data.getAs("line").toString()

    text.foreach((char) => {
      val count = map.getOrElse(char.toString, 0)

      map.update(char.toString, count + 1)
    })
  })

  map
}

 

 

 

 

 

考察

DataFrameをforeachすると、内部では、

rddという変数(Resilient Distributed Dataset – 不変で並列実行可能なCollection)

のforeachが処理されます。

spark側が分散処理する起点としているのが、rddと考えられ、

javaでいう複数スレッド並列処理を内部で行っているものと推測されます。

 

したがって、foreach外にあるmap変数を更新することは

スレッド外の変数を更新することになるため、

その処理は行われず、内部で新規にmapを生成し、

スレッド間共有しているのではないかなぁと推測しています。

 

一方、collectメソッドを介す方では、

collectメソッドがデータをArrayで返却していて、

通常のforeach処理されるので、

mapの内容も更新することができたようです。

 

うーん、かなりニッチなお話でしたね。

それでは!

 


にほんブログ村


人気ブログランキング

モバイルバージョンを終了