【jbatch】【step-partitioning】batch propertyにObjectは渡せない!?

IT
スポンサーリンク

少し前に引き続きjbatchものです。

今、jbatchを用いたFWのようなもの、要するに

実装者が楽に開発できるような基板部を作成しているのですが、

そこで、前回前々回と書いているとおり、

scope-partitionを利用したバッチの並列処理の基盤を

設計・実装していたのですが、

当然できるよね?と思っていたことが仕様上、

できないことがわかったので記事にしてみました。

Mapper→partitionへのデータ受け渡し

普通に考えたら、MapperでデータSelectして、

その件数で動的にpartitionの数を決めると。

ここまではOK。

じゃ、次は個々のpartitionが処理するデータを

元データから分割して渡そうと思いますよね。

こんな感じ。

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.math.RoundingMode;
import java.util.List;
import java.util.Properties;
import java.util.stream.IntStream;

import javax.batch.api.partition.PartitionMapper;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionPlanImpl;
import javax.enterprise.context.Dependent;
import javax.inject.Named;

import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.common.math.IntMath;

import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("TestMapper")
public class TestMapper implements PartitionMapper {

    @Override
    public PartitionPlan mapPartitions() throws Exception {

        System.out.println("Mapper mapPartitions Start");

        // 対象データ取得
        List<TestUserTableEntity> list = getTargetData();

        PartitionPlan plan = new PartitionPlanImpl();

        // パーティション数を算出
        if (CollectionUtils.isEmpty(list)) {

            // 豆知識・・・partitionを0にするとbatchletもcollectorもanalyzerも動作せず、即reducerに戻る
            plan.setPartitions(0);
        } else {
            // 各パーティションで処理する数を3件とすると・・・
            plan.setPartitions(IntMath.divide(list.size(), 3, RoundingMode.CEILING));
        }

        // データ分割
        List<Properties> propertyList = divideData(list, 3);

        // propertyは未設定だとNullPointerで落ちる
        plan.setPartitionProperties(propertyList.toArray(new Properties[propertyList.size()]));

        return plan;
    }

    // 対象データ取得はこんな感じをイメージ
    private List<TestUserTableEntity> getTargetData() {

        List<TestUserTableEntity> list = Lists.newArrayList();

        IntStream.range(1, 10).forEach(i -> {

            TestUserTableEntity entity = new TestUserTableEntity();
            entity.setUserId(String.format("%3d", i));
            list.add(entity);
        });

        return list;
    }

    // データ分割はこんな感じをイメージ
    private List<Properties> divideData(List<TestUserTableEntity> list, Integer partitionBaseSize) {

        List<Properties> propertyList = Lists.newArrayList();
        List<TestUserTableEntity> tempList = Lists.newArrayList();

        for (TestUserTableEntity entity : list) {

            if (tempList.size() == partitionBaseSize) {

                Properties properties = new Properties();
                properties.put("inputData", tempList);
                propertyList.add(properties);

                tempList = Lists.newArrayList();
            }
       tempList.add(entity);
        }

        Properties properties = new Properties();
        properties.put("inputData", tempList);
        propertyList.add(properties);

        return propertyList;
    }
}

 

 

さて、これをbatchletで受け取ろうかな。

こんな感じで受けとろうとしました。

 

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.util.List;

import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.transaction.Transactional;

import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao;
import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("SampleBatchlet")
public class SampleBatchlet implements Batchlet {

    @Inject
    private TestUserTableDao dao;

    @Inject
    @BatchProperty
    private String param;

    @Inject
    @BatchProperty
    private List<TestUserTableEntity> inputData;

    @Override
    @Transactional
    public String process() throws Exception {

        System.out.println("SampleBatchlet process start");

        for (TestUserTableEntity entity : inputData) {
            dao.insert(entity);
        }

        return "OK";
    }

    @Override
    public void stop() throws Exception {
        System.out.println("SampleBatchlet stop start");
    }
}

 

 

とれないよ??

驚きでした。

@BatchPropertyでInjectした

inputDataはnullでinjectされません。

 

jbatchはwildfly上で動かしているので、実装はjberetなのですが・・・

 

Stirngしか許容してない!!

jberetのgithubのソースのl413にて

Map.Entry<String,String>って書いてるーー!

そっか、Stringした許容していなんだ。

 

JSR325の仕様書にも

そんな感じの記述が・・・。

 

こちら

14P目の7.1.2 JobParametersの項で

「 Job parameters are keyword/value string pairs」

Oh No.

 

パラメータはキーと値をStirngのペアです。

はい、すいません。。。

 

 

それでは、どうする?

Stringしか渡せない、でもObjectを渡したい

 

それなら、jsonでしょー。

 

ということで、jsonライブラリjacksonを使用して

データ分割部を以下のように変更!

 

    // データ分割はこんな感じをイメージ
    private List<Properties> divideData(List<TestUserTableEntity> list, Integer partitionBaseSize) throws Exception {

        List<Properties> propertyList = Lists.newArrayList();
        List<TestUserTableEntity> tempList = Lists.newArrayList();

        for (TestUserTableEntity entity : list) {

            if (tempList.size() == partitionBaseSize) {

                Properties properties = new Properties();

                // Json変換してStringにしてセットする
                properties.put("inputData", new ObjectMapper().writeValueAsString(tempList));
                propertyList.add(properties);

                tempList = Lists.newArrayList();
            }

            tempList.add(entity);
        }

        Properties properties = new Properties();
        properties.put("inputData", tempList);
        propertyList.add(properties);

        return propertyList;
    }

 

これを受けるときは・・・

以下のようにしてjsonからEntityに戻してあげればいいですね。

 

package jp.co.tarchan.sample_javaee_webapp.batchlet;

import java.util.LinkedHashMap;
import java.util.List;

import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.transaction.Transactional;

import org.apache.commons.beanutils.BeanUtils;

import com.fasterxml.jackson.databind.ObjectMapper;

import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao;
import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity;

@Dependent
@Named("SampleBatchlet")
public class SampleBatchlet implements Batchlet {

    @Inject
    private TestUserTableDao dao;

    @Inject
    @BatchProperty
    private String param;

    @Inject
    @BatchProperty
    private String inputData;

    @Override
    @Transactional
    public String process() throws Exception {

        System.out.println("SampleBatchlet process start");

        // こんなjsonからListオブジェクトに戻す
        // ただ、このときのListの要素はLinkedHashMapになっている
        List<LinkedHashMap> inputDataList = new ObjectMapper().readValue(inputData, List.class);

        for (LinkedHashMap map : inputDataList) {

            // BeanUtilsでMapをDtoに変換
            TestUserTableEntity entity = new TestUserTableEntity();
            BeanUtils.populate(entity, map);
            dao.insert(entity);
        }

        // TypeReferenceを以下のようにnewして使用すると型指定できる
        List<TestUserTableEntity> inputDataList = new ObjectMapper().readValue(inputData, new TypeReference<List<TestUserTableEntity>>() {});

        for (TestUserTableEntity entity : inputDataList) {

            // こうするとpoulate不要
            dao.insert(entity);
        }


        return "OK";
    }

    @Override
    public void stop() throws Exception {
        System.out.println("SampleBatchlet stop start");
    }
}

 

まとめ

Mapperからpartitionにデータを渡す方法についてお話しました。

まぁ、平たく言えば、仕様がどうであってもなんとかなるよねー。

というお話でした。それでは!

 

 

ブログランキング・にほんブログ村へ
にほんブログ村


人気ブログランキング

コメント

タイトルとURLをコピーしました