【jbatch】step-partitioningによるトランザクション境界

IT
スポンサーリンク

今、仕事でjbatchをさわってます。

jbatchは、JavaEE7で導入され、JSR352で定められた

javaにおけるbatch applicationの仕様です。原文仕様はこちら

タイトルの件、そんなことしないで、普通に実装するときはどうするのか?

とかは他の投稿でするとして、

今個人的にホットなstep-partitioningによるトランザクション境界

について実行することでわかったことをお話したいと思います。

そもそもjbatchのstep-partitioningって?

jbatchにおける並列処理の機能の1つ

通常のBatchlet、Chunkを並列に処理させる機能です。

それを実現するために、Reducer・Mapper・Collector・Analyzerという

緩衝領域が存在します。

Reducer:並列処理全体の前後処理などを実装

Mapper:分割単位、実行スレッド数などを実装

Collector:各並列処理ごとのAnalyzerへ渡すための中間処理を実装

Analyzer:各並列処理を統合する処理

最小構成で実装してみた

Batchlet

メインとなるBatchletです。

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.transaction.Transactional;

import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao;
import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("SampleBatchlet")
public class SampleBatchlet implements Batchlet {

    @Inject
    private TestUserTableDao dao;

    @Inject
    @BatchProperty
    private String param;

    @Override
    @Transactional
    public String process() throws Exception {

        System.out.println("SampleBatchlet process start");

        TestUserTableEntity entity = new TestUserTableEntity();
        entity.setUserId(param);
        entity.setUserName("BBB");
        entity.setDepartmentId("CCC");

        dao.insert(entity);

        return "OK";
    }

    @Override
    public void stop() throws Exception {
        System.out.println("SampleBatchlet stop start");
    }
}

 

 

 

 

 

Reducer

中の実装は無しです。step-partitioningにおいてMapperなどの緩衝領域はすべてが

任意実装です。なければないで動作します。

※分割単位を定義するMapperは必要かと思いきや、

Mapperがなくてもjobの定義xmlにplanタグとして静的に設定可能すればOK

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import javax.batch.api.partition.PartitionReducer;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

@Dependent
@Named("TestReducer")
public class TestReducer implements PartitionReducer {

    @Override
    public void beginPartitionedStep() throws Exception {

    }

    @Override
    public void beforePartitionedStepCompletion() throws Exception {

    }

    @Override
    public void rollbackPartitionedStep() throws Exception {

    }

    @Override
    public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception {

    }
}

 

Mapper

property配列の要素がそれぞれpartitionに順番に割り当てられます。

配列の要素0は、1番目のpartitionに・・・という具合です。

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.util.Properties;

import javax.batch.api.partition.PartitionMapper;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionPlanImpl;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

@Dependent
@Named("TestMapper")
public class TestMapper implements PartitionMapper {

    @Override
    public PartitionPlan mapPartitions() throws Exception {

        System.out.println("Mapper mapPartitions Start");

        PartitionPlan plan = new PartitionPlanImpl();

        Properties[] properties = new Properties[2];

        Properties Property1 = new Properties();
        Property1.put("param", "param1");
        properties[0] = Property1;

        Properties Property2 = new Properties();
        Property2.put("param", "param2");
        properties[1] = Property2;

        // propertyは未設定だとNullPointerで落ちる
        plan.setPartitionProperties(properties);
        plan.setPartitions(2);
        plan.setThreads(2);

        return plan;
    }
}

 

 

 

 

 

Collector

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.io.Serializable;

import javax.batch.api.partition.PartitionCollector;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("TestCollector")
public class TestCollector implements PartitionCollector {

    //    @Inject
    //    private TestUserTableDao dao;
    //
    //    int i = 3;

    @Override
    public Serializable collectPartitionData() throws Exception {

        System.out.println("Collector collectPartitionData Start");

        //        TestUserTableEntity entity = new TestUserTableEntity();
        //        entity.setUserId("AAA" + i);
        //        entity.setUserName("BBB");
        //        entity.setDepartmentId("CCC");
        //
        //        dao.insert(entity);
        //
        //        i++;

        return new TestUserTableEntity();
    }
}

 

 

Analyzer

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.io.Serializable;

import javax.batch.api.partition.PartitionAnalyzer;
import javax.batch.runtime.BatchStatus;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;

import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao;
import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("TestAnalyzer")
public class TestAnalyzer implements PartitionAnalyzer {

    @Inject
    private TestUserTableDao dao;

    int i = 0;

    @Override
    public void analyzeCollectorData(Serializable data) throws Exception {
        System.out.println("Analyzer analyzeCollectorData Start");

        TestUserTableEntity entity = new TestUserTableEntity();
        entity.setUserId("AAA" + i);
        entity.setUserName("BBB");
        entity.setDepartmentId("CCC");

        dao.insert(entity);

        i++;
    }

    @Override
    public void analyzeStatus(BatchStatus batchStatus, String exitStatus) throws Exception {
        System.out.println("Analyzer analyzeStatus Start");
    }
}

 

job定義xml

Mapperから各batchlet(partition単位)へパラメータを渡すために、

#{partitionPlan[‘param’]}を定義しています。

これを各batchletは@BatchPropertyと@Injectをつけてnameと同じフィールド名で

定義するとpartition別にパラメータを取得できます。

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<job id="tarchan-sample-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">

	<step id="batch-001">
		<batchlet ref="SampleBatchlet">
			<properties>
				<property name="param" value="#{partitionPlan['param']}"/>
			</properties>
		</batchlet>

		<partition>
			<reducer ref="TestReducer" />
			<mapper ref="TestMapper" />
			<collector ref="TestCollector" />
			<analyzer ref="TestAnalyzer" />
		</partition>

	</step>

</job>

 

 

動作させてみる

まずはストレートにそのまま

Mapperにてpartitionを2としています。

これは2つを並列実行させることを意味します。

普通はここで対象取得して、件数からpartition数判断させて設定って

感じになるでしょうか。

 

上記サンプルはそのまま実行すると、

Batchletで2件、Analyzerで2件、テストテーブルにinsertされ

計4件insertされます。

 

Analyzerのトランザクションは??

ここで不思議なことが起こっています。

AnalyzerはTransactional定義等していないのに、

insertされているのです。

 

Analyzerは並列処理を統合する役割を担っているので、

クラスとしてはSingletonに動作しています。

役割からjbatchのコントローラ側で

Defaultでトランザクションが張られているのだと思います。

 

では、Collectorではどうか?

上記サンプルで、Collectorにコメントアウトがあると思います。

それを外して実行してみたところ、

「Transaction is required」(トランザクションは必須ですよ)と

エラーになりました。

 

 

まとめ

step-partitioningにおけるトランザクションは、

Analyzerではデフォルトでトランザクションが用意されており、

それ以外はTransactionalアノテーションや、

UserTransactionなどで自前で定義する必要がある。

ということになります。

 

もっとこのあたりを掘り下げてほしいという要望など

ありましたらご連絡をいただければと思います。

 

近いうちに、jbatchのスタートアップな記事を書きたいと思います。

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました