今、仕事でjbatchをさわってます。
jbatchは、JavaEE7で導入され、JSR352で定められた
javaにおけるbatch applicationの仕様です。原文仕様はこちら。
タイトルの件、そんなことしないで、普通に実装するときはどうするのか?
とかは他の投稿でするとして、
今個人的にホットなstep-partitioningによるトランザクション境界
について実行することでわかったことをお話したいと思います。
そもそもjbatchのstep-partitioningって?
jbatchにおける並列処理の機能の1つ
通常のBatchlet、Chunkを並列に処理させる機能です。
それを実現するために、Reducer・Mapper・Collector・Analyzerという
緩衝領域が存在します。
Reducer:並列処理全体の前後処理などを実装
Mapper:分割単位、実行スレッド数などを実装
Collector:各並列処理ごとのAnalyzerへ渡すための中間処理を実装
Analyzer:各並列処理を統合する処理
最小構成で実装してみた
Batchlet
メインとなるBatchletです。
package jp.co.tarchan.sample_javaee_webapp.batchlet; import javax.batch.api.BatchProperty; import javax.batch.api.Batchlet; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; import javax.transaction.Transactional; import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("SampleBatchlet") public class SampleBatchlet implements Batchlet { @Inject private TestUserTableDao dao; @Inject @BatchProperty private String param; @Override @Transactional public String process() throws Exception { System.out.println("SampleBatchlet process start"); TestUserTableEntity entity = new TestUserTableEntity(); entity.setUserId(param); entity.setUserName("BBB"); entity.setDepartmentId("CCC"); dao.insert(entity); return "OK"; } @Override public void stop() throws Exception { System.out.println("SampleBatchlet stop start"); } }
Reducer
中の実装は無しです。step-partitioningにおいてMapperなどの緩衝領域はすべてが
任意実装です。なければないで動作します。
※分割単位を定義するMapperは必要かと思いきや、
Mapperがなくてもjobの定義xmlにplanタグとして静的に設定可能すればOK
package jp.co.tarchan.sample_javaee_webapp.batchlet; import javax.batch.api.partition.PartitionReducer; import javax.enterprise.context.Dependent; import javax.inject.Named; @Dependent @Named("TestReducer") public class TestReducer implements PartitionReducer { @Override public void beginPartitionedStep() throws Exception { } @Override public void beforePartitionedStepCompletion() throws Exception { } @Override public void rollbackPartitionedStep() throws Exception { } @Override public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception { } }
Mapper
property配列の要素がそれぞれpartitionに順番に割り当てられます。
配列の要素0は、1番目のpartitionに・・・という具合です。
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.util.Properties; import javax.batch.api.partition.PartitionMapper; import javax.batch.api.partition.PartitionPlan; import javax.batch.api.partition.PartitionPlanImpl; import javax.enterprise.context.Dependent; import javax.inject.Named; @Dependent @Named("TestMapper") public class TestMapper implements PartitionMapper { @Override public PartitionPlan mapPartitions() throws Exception { System.out.println("Mapper mapPartitions Start"); PartitionPlan plan = new PartitionPlanImpl(); Properties[] properties = new Properties[2]; Properties Property1 = new Properties(); Property1.put("param", "param1"); properties[0] = Property1; Properties Property2 = new Properties(); Property2.put("param", "param2"); properties[1] = Property2; // propertyは未設定だとNullPointerで落ちる plan.setPartitionProperties(properties); plan.setPartitions(2); plan.setThreads(2); return plan; } }
Collector
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.io.Serializable; import javax.batch.api.partition.PartitionCollector; import javax.enterprise.context.Dependent; import javax.inject.Named; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("TestCollector") public class TestCollector implements PartitionCollector { // @Inject // private TestUserTableDao dao; // // int i = 3; @Override public Serializable collectPartitionData() throws Exception { System.out.println("Collector collectPartitionData Start"); // TestUserTableEntity entity = new TestUserTableEntity(); // entity.setUserId("AAA" + i); // entity.setUserName("BBB"); // entity.setDepartmentId("CCC"); // // dao.insert(entity); // // i++; return new TestUserTableEntity(); } }
Analyzer
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.io.Serializable; import javax.batch.api.partition.PartitionAnalyzer; import javax.batch.runtime.BatchStatus; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("TestAnalyzer") public class TestAnalyzer implements PartitionAnalyzer { @Inject private TestUserTableDao dao; int i = 0; @Override public void analyzeCollectorData(Serializable data) throws Exception { System.out.println("Analyzer analyzeCollectorData Start"); TestUserTableEntity entity = new TestUserTableEntity(); entity.setUserId("AAA" + i); entity.setUserName("BBB"); entity.setDepartmentId("CCC"); dao.insert(entity); i++; } @Override public void analyzeStatus(BatchStatus batchStatus, String exitStatus) throws Exception { System.out.println("Analyzer analyzeStatus Start"); } }
job定義xml
Mapperから各batchlet(partition単位)へパラメータを渡すために、
#{partitionPlan[‘param’]}を定義しています。
これを各batchletは@BatchPropertyと@Injectをつけてnameと同じフィールド名で
定義するとpartition別にパラメータを取得できます。
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE configuration> <job id="tarchan-sample-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="batch-001"> <batchlet ref="SampleBatchlet"> <properties> <property name="param" value="#{partitionPlan['param']}"/> </properties> </batchlet> <partition> <reducer ref="TestReducer" /> <mapper ref="TestMapper" /> <collector ref="TestCollector" /> <analyzer ref="TestAnalyzer" /> </partition> </step> </job>
動作させてみる
まずはストレートにそのまま
Mapperにてpartitionを2としています。
これは2つを並列実行させることを意味します。
普通はここで対象取得して、件数からpartition数判断させて設定って
感じになるでしょうか。
上記サンプルはそのまま実行すると、
Batchletで2件、Analyzerで2件、テストテーブルにinsertされ
計4件insertされます。
Analyzerのトランザクションは??
ここで不思議なことが起こっています。
AnalyzerはTransactional定義等していないのに、
insertされているのです。
Analyzerは並列処理を統合する役割を担っているので、
クラスとしてはSingletonに動作しています。
役割からjbatchのコントローラ側で
Defaultでトランザクションが張られているのだと思います。
では、Collectorではどうか?
上記サンプルで、Collectorにコメントアウトがあると思います。
それを外して実行してみたところ、
「Transaction is required」(トランザクションは必須ですよ)と
エラーになりました。
まとめ
step-partitioningにおけるトランザクションは、
Analyzerではデフォルトでトランザクションが用意されており、
それ以外はTransactionalアノテーションや、
UserTransactionなどで自前で定義する必要がある。
ということになります。
もっとこのあたりを掘り下げてほしいという要望など
ありましたらご連絡をいただければと思います。
近いうちに、jbatchのスタートアップな記事を書きたいと思います。
20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!