AWS EMRを使用することになり、
アプリとしてsparkを使用することになりました。
scalaはplay frameworkで過去にお世話になっていましたが、
saprk自体は初めて触るので、
サンプルを作成してみようかなということになりました。
今回はそのサンプルを作成した際に、
ハマってしまった箇所について、
お話したいと思います。
Sparkサンプルの概要
単純ですが、テキストファイルを読み込んで、DataFrame化して、
その内容を元に各文字が何回出現するかをカウントするというものです。
最初に書いたサンプル
まずはイメージで以下のように書いてみました。
ところがこれでは期待した動作をしなかったんです。
def count(df:DataFrame): mutable.Map[String, Int] = { val map = mutable.Map[String, Int]() df.foreach(data => { val text = data.getAs("line").toString() text.foreach((char) => { val count = map.getOrElse(char.toString, 0) map.update(char.toString, count + 1) }) }) map }
何が起きたかというと、
returnしたいmapがsize 0のままなんです。
デバッグで追いかけても、
ループの途中ではmapのsizeも上がってるし、
中身もセットされているので、「???」でした。
解決したサンプル
かなりハマってしまったのですが、
以下にすることで解決しました。
DataFrameをforeachするのではなく、
DataFrameのcollectメソッドを一度呼んで、それをforeachすればOKでした。
def count(df:DataFrame): mutable.Map[String, Int] = {
val map = mutable.Map[String, Int]()
df.collect().foreach(data => {
val text = data.getAs("line").toString()
text.foreach((char) => {
val count = map.getOrElse(char.toString, 0)
map.update(char.toString, count + 1)
})
})
map
}
考察
DataFrameをforeachすると、内部では、
rddという変数(Resilient Distributed Dataset – 不変で並列実行可能なCollection)
のforeachが処理されます。
spark側が分散処理する起点としているのが、rddと考えられ、
javaでいう複数スレッド並列処理を内部で行っているものと推測されます。
したがって、foreach外にあるmap変数を更新することは
スレッド外の変数を更新することになるため、
その処理は行われず、内部で新規にmapを生成し、
スレッド間共有しているのではないかなぁと推測しています。
一方、collectメソッドを介す方では、
collectメソッドがデータをArrayで返却していて、
通常のforeach処理されるので、
mapの内容も更新することができたようです。
うーん、かなりニッチなお話でしたね。
それでは!
20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!
コメント