【Hadoop】MapReduceを書いてみる

Hadoop
スポンサーリンク

前回はwindowsでHadoopのサンプルアプリ(wordcount)を

環境構築して動かしてみました。

 

 

今回は実際にMapReduceをプログラムしてみようと思います。

ここでは、おなじみの形態素解析を行い、文章の品詞を

カウントするという処理を作ってみたいと思います。

 

 

 

 

 

 

環境

実施環境としては、

・windows10

・Eclipse

・Maven

・java 1.8

・kuromoji 形態素解析エンジン

を使って行います。

 

 

また、対象の文章はこちらもおなじみ桃太郎の一節から。

昔々、あるところに、おじいさんとおばあさんが住んでいました。
おじいさんは山へ芝刈りに、おばあさんは川へ洗濯に行きました。
おばあさんが川で洗濯をしていると、ドンブラコ、ドンブラコと、大きな桃が流れてきました。

という文章のテキストファイルを用いることとします。

 

 

Mavenプロジェクト作成

EclipseにてMavenプロジェクトを作成します。

また、pom.xmlにhadoop関連と、kuromojiの依存関係を追加します。

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>0.20.2</version>
     </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.2.1</version>
    </dependency>

    <dependency>
      <groupId>com.atilika.kuromoji</groupId>
      <artifactId>kuromoji-ipadic</artifactId>
      <version>0.9.0</version>
    </dependency>

  </dependencies>

 

※ここでhadoop-coreのversionを0.20.2としているのは、

それ以降のversionでは、Eclipseでjavaアプリケーションを素で実行すると、

hadoop側のプログラムでjava.io.FileクラスのsetReadable()メソッドなど、

フォルダのパーミッションを変更する処理があるのですが、

こちらがjava側の仕様でWindowsでは動作しません

そのため、このパーミッション変更処理がないversionで動かす必要があり、

0.20.2としています。

 

 

MapReduceプログラム作成

プログラム全文を以下のように記述します。

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.atilika.kuromoji.ipadic.Token;
import com.atilika.kuromoji.ipadic.Tokenizer;

public class WordCountSample {

	public static class SampleMapper extends Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

			Tokenizer tokenizer = new Tokenizer();
			List<Token> tokens = tokenizer.tokenize(value.toString());

			for (Token token : tokens) {

				word.set(token.getPartOfSpeechLevel1());
				context.write(word, one);
			}
		}
	}

	public static class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

			int sum = 0;

			for (IntWritable val : values) {
				sum += val.get();
			}

			result.set(sum);
			context.write(key, result);

		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = new Job(conf, "word count sample");
		job.setJarByClass(WordCountSample.class);
		job.setMapperClass(SampleMapper.class);
		job.setCombinerClass(SampleReducer.class);
		job.setReducerClass(SampleReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(false) ? 0 : 1);
	}

}

 

全体構成

元となるクラス内で、内部クラスとして、Map用のクラス、Reducer用のクラスを

記述しています。

このあたりは、Hadoop Tutorialの構成を参考にしています。

 

Mapクラス解説

	public static class SampleMapper extends Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

			Tokenizer tokenizer = new Tokenizer();
			List<Token> tokens = tokenizer.tokenize(value.toString());

			for (Token token : tokens) {

				word.set(token.getPartOfSpeechLevel1());
				context.write(word, one);
			}
		}
	}

 

MapクラスはMapperを拡張して作成します。

MapもReduceも共にデータをkey、valueで格納することを想定しており、

Mapperジェネリクスの型はKEYIN、VALUEIN、KEYOUT、VALUEOUTとなります。

 

 

valueはWritableインターフェースを実装した型を期待しています。

writableはhadoop内でシリアライズ、デシリアライズを行うようにするための

ものであり、データ転送に適したものである必要があるということです。

 

 

Tokenizerがkuromojiの定義でありtokenizeメソッドで解析します。

また、このmapメソッドはテキストファイルの1行ごとに呼び出されます。

 

kuromojiは解析すると、以下のような内容がTokenクラスに格納されます。

品詞は4つ細分類され、それぞれの読みや発音などの情報が格納されます。

 

Surface form Part-of-Speech Base form Reading Pronunciation
名詞,代名詞,一般,* ワタシ
助詞,連体化,*,*
名前 名詞,一般,*,* 名前 ナマエ
助詞,係助詞,*,*
hoge 名詞,一般,*,* * ?
です 助動詞,*,*,* です デス
記号,句点,*,*

今回は細分類のうち、1番初めのものを使用しています。

その取得箇所がtoken.getPartOfSpeechLevel1()となります。

 

contextにwriteしている箇所は、

キーとなる品詞の数が1個存在するという形でため込んでいるということになります。

 

Reduce解説

	public static class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

			int sum = 0;

			for (IntWritable val : values) {
				sum += val.get();
			}

			result.set(sum);
			context.write(key, result);

		}
	}

 

ここで行っていることは、Mapでキーごとに1個と数えたものを

集計しています。

この処理で品詞ごとの数をカウントしています。

 

 

main解説

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = new Job(conf, "word count sample");
		job.setJarByClass(WordCountSample.class);
		job.setMapperClass(SampleMapper.class);
		job.setCombinerClass(SampleReducer.class);
		job.setReducerClass(SampleReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(false) ? 0 : 1);
	}

 

ここでは実際の処理の準備になります。

Jobクラスを生成し、

・実行クラス

・Mapperクラス

・Combinerクラス

・Reducerクラス

・OUTのキークラス

・OUTのvalueクラス

・入力ファイル読込先

・出力ファイル出力先

を定義しています。

job.waitForCompletionにて処理完了を待つということになります。

 

 

実行する

javaアプリケーションとして実行

mainメソッドを含んでいるので、当然にjavaアプリケーションとして

実行可能です。

実行の構成にて処理引数に入力パス、出力パスと指定すればOKです。

 

出力パスは存在するとエラーになるので注意が必要です。

 

 

Hadoopで実行

Hadoopで実行するためには、まずEclipse側で

maven packageしてjarを生成します。

 

そして、以下のコマンドを実行します。

%HADOOP_HOME%\bin\hadoop jar <生成したjarパス> <フルパッケージでのメソッド名> <入力パス> <出力パス>

 

 

結果

今回の入力結果では、以下のような結果になるかと思います。

助動詞	6
助詞	18
動詞	7
名詞	17
記号	10
連体詞	2

 

 

まとめ

今回はHadoopのMapReduceを実際に作成してみました。

Hadoopで扱えるデータの型には色々な種類があるので、

興味のある方はそれらを使って、実際に手を動かしてみると

良いかと思います。

 

それでは!!

 

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました