少し前に引き続きjbatchものです。
今、jbatchを用いたFWのようなもの、要するに
実装者が楽に開発できるような基板部を作成しているのですが、
scope-partitionを利用したバッチの並列処理の基盤を
設計・実装していたのですが、
当然できるよね?と思っていたことが仕様上、
できないことがわかったので記事にしてみました。
Mapper→partitionへのデータ受け渡し
普通に考えたら、MapperでデータSelectして、
その件数で動的にpartitionの数を決めると。
ここまではOK。
じゃ、次は個々のpartitionが処理するデータを
元データから分割して渡そうと思いますよね。
こんな感じ。
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.math.RoundingMode; import java.util.List; import java.util.Properties; import java.util.stream.IntStream; import javax.batch.api.partition.PartitionMapper; import javax.batch.api.partition.PartitionPlan; import javax.batch.api.partition.PartitionPlanImpl; import javax.enterprise.context.Dependent; import javax.inject.Named; import org.apache.commons.collections4.CollectionUtils; import com.google.common.collect.Lists; import com.google.common.math.IntMath; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("TestMapper") public class TestMapper implements PartitionMapper { @Override public PartitionPlan mapPartitions() throws Exception { System.out.println("Mapper mapPartitions Start"); // 対象データ取得 List<TestUserTableEntity> list = getTargetData(); PartitionPlan plan = new PartitionPlanImpl(); // パーティション数を算出 if (CollectionUtils.isEmpty(list)) { // 豆知識・・・partitionを0にするとbatchletもcollectorもanalyzerも動作せず、即reducerに戻る plan.setPartitions(0); } else { // 各パーティションで処理する数を3件とすると・・・ plan.setPartitions(IntMath.divide(list.size(), 3, RoundingMode.CEILING)); } // データ分割 List<Properties> propertyList = divideData(list, 3); // propertyは未設定だとNullPointerで落ちる plan.setPartitionProperties(propertyList.toArray(new Properties[propertyList.size()])); return plan; } // 対象データ取得はこんな感じをイメージ private List<TestUserTableEntity> getTargetData() { List<TestUserTableEntity> list = Lists.newArrayList(); IntStream.range(1, 10).forEach(i -> { TestUserTableEntity entity = new TestUserTableEntity(); entity.setUserId(String.format("%3d", i)); list.add(entity); }); return list; } // データ分割はこんな感じをイメージ private List<Properties> divideData(List<TestUserTableEntity> list, Integer partitionBaseSize) { List<Properties> propertyList = Lists.newArrayList(); List<TestUserTableEntity> tempList = Lists.newArrayList(); for (TestUserTableEntity entity : list) { if (tempList.size() == partitionBaseSize) { Properties properties = new Properties(); properties.put("inputData", tempList); propertyList.add(properties); tempList = Lists.newArrayList(); } tempList.add(entity); } Properties properties = new Properties(); properties.put("inputData", tempList); propertyList.add(properties); return propertyList; } }
さて、これをbatchletで受け取ろうかな。
こんな感じで受けとろうとしました。
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.util.List; import javax.batch.api.BatchProperty; import javax.batch.api.Batchlet; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; import javax.transaction.Transactional; import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("SampleBatchlet") public class SampleBatchlet implements Batchlet { @Inject private TestUserTableDao dao; @Inject @BatchProperty private String param; @Inject @BatchProperty private List<TestUserTableEntity> inputData; @Override @Transactional public String process() throws Exception { System.out.println("SampleBatchlet process start"); for (TestUserTableEntity entity : inputData) { dao.insert(entity); } return "OK"; } @Override public void stop() throws Exception { System.out.println("SampleBatchlet stop start"); } }
とれないよ??
驚きでした。
@BatchPropertyでInjectした
inputDataはnullでinjectされません。
jbatchはwildfly上で動かしているので、実装はjberetなのですが・・・
Stirngしか許容してない!!
Map.Entry<String,String>って書いてるーー!
そっか、Stringした許容していなんだ。
JSR325の仕様書にも
そんな感じの記述が・・・。
こちらの
14P目の7.1.2 JobParametersの項で
「 Job parameters are keyword/value string pairs」
Oh No.
パラメータはキーと値をStirngのペアです。
はい、すいません。。。
それでは、どうする?
Stringしか渡せない、でもObjectを渡したい
それなら、jsonでしょー。
ということで、jsonライブラリjacksonを使用して
データ分割部を以下のように変更!
// データ分割はこんな感じをイメージ private List<Properties> divideData(List<TestUserTableEntity> list, Integer partitionBaseSize) throws Exception { List<Properties> propertyList = Lists.newArrayList(); List<TestUserTableEntity> tempList = Lists.newArrayList(); for (TestUserTableEntity entity : list) { if (tempList.size() == partitionBaseSize) { Properties properties = new Properties(); // Json変換してStringにしてセットする properties.put("inputData", new ObjectMapper().writeValueAsString(tempList)); propertyList.add(properties); tempList = Lists.newArrayList(); } tempList.add(entity); } Properties properties = new Properties(); properties.put("inputData", tempList); propertyList.add(properties); return propertyList; }
これを受けるときは・・・
以下のようにしてjsonからEntityに戻してあげればいいですね。
package jp.co.tarchan.sample_javaee_webapp.batchlet; import java.util.LinkedHashMap; import java.util.List; import javax.batch.api.BatchProperty; import javax.batch.api.Batchlet; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; import javax.transaction.Transactional; import org.apache.commons.beanutils.BeanUtils; import com.fasterxml.jackson.databind.ObjectMapper; import jp.co.tarchan.sample_javaee_webapp.dao.TestUserTableDao; import jp.co.tarchan.sample_javaee_webapp.entity.TestUserTableEntity; @Dependent @Named("SampleBatchlet") public class SampleBatchlet implements Batchlet { @Inject private TestUserTableDao dao; @Inject @BatchProperty private String param; @Inject @BatchProperty private String inputData; @Override @Transactional public String process() throws Exception { System.out.println("SampleBatchlet process start"); // こんなjsonからListオブジェクトに戻す // ただ、このときのListの要素はLinkedHashMapになっている List<LinkedHashMap> inputDataList = new ObjectMapper().readValue(inputData, List.class); for (LinkedHashMap map : inputDataList) { // BeanUtilsでMapをDtoに変換 TestUserTableEntity entity = new TestUserTableEntity(); BeanUtils.populate(entity, map); dao.insert(entity); } // TypeReferenceを以下のようにnewして使用すると型指定できる List<TestUserTableEntity> inputDataList = new ObjectMapper().readValue(inputData, new TypeReference<List<TestUserTableEntity>>() {}); for (TestUserTableEntity entity : inputDataList) { // こうするとpoulate不要 dao.insert(entity); } return "OK"; } @Override public void stop() throws Exception { System.out.println("SampleBatchlet stop start"); } }
まとめ
Mapperからpartitionにデータを渡す方法についてお話しました。
まぁ、平たく言えば、仕様がどうであってもなんとかなるよねー。
というお話でした。それでは!
20代前半までは東京で音楽をやりながら両手の指以上の業種でアルバイト生活をしていましたが、某大手プロバイダのテレアポのバイトでPCの知識の無さに愕然とし、コンピュータをもっと知りたい!と思ったことをきっかけに25歳の時にITの世界に未経験で飛び込みました。
紆余曲折を経て、現在は個人事業主としてお仕事させていただいており、10年ほどになります。
web制作から企業システム構築、ツール開発など、フロントエンドもバックエンドもサーバーもDBAも依頼があれば何でもやってきた雑食系エンジニアです。
今風にいうとフルスタックエンジニアということになるのでしょうか??
→ 詳細プロフィールというか、生い立ちはこちら
→スキルシートをご覧になる場合はこちら
→お仕事のご依頼やお見積りなどお問い合わせはこちらから!