batchee-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Romain Manni-Bucau <rmannibu...@gmail.com>
Subject Fwd: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they are running
Date Mon, 15 Feb 2016 15:13:19 GMT
guess it is a copy paste but we don't need IBM in the header for our own
code ;)

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Tomitriber
<http://www.tomitribe.com>

---------- Forwarded message ----------
From: <rsandtner@apache.org>
Date: 2016-02-15 16:00 GMT+01:00
Subject: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
are running
To: commits@batchee.incubator.apache.org


Repository: incubator-batchee
Updated Branches:
  refs/heads/master 1044c365b -> 24097670c


BATCHEE-95 stop subJobs only if they are running


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670

Branch: refs/heads/master
Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
Parents: 1044c36
Author: Reinhard Sandtner <rsandtner@apache.org>
Authored: Mon Feb 15 15:59:12 2016 +0100
Committer: Reinhard Sandtner <rsandtner@apache.org>
Committed: Mon Feb 15 15:59:12 2016 +0100

----------------------------------------------------------------------
 .../controller/PartitionedStepController.java   |   8 +-
 .../test/partitioned/PartitionedBatchTest.java  | 141 +++++++++++++++++++
 .../META-INF/batch-jobs/partition-stop.xml      |  30 ++++
 3 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
---
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@ public class PartitionedStepController extends
BaseStepController {
             if (parallelBatchWorkUnits != null) {
                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
                     try {
-
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+                        // only try to stop the sub-jobs if they are
running
+                        if (subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTING ||
+                            subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTED) {
+
+
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                        }
                     } catch (Exception e) {
                         // TODO - Is this what we want to know.
                         // Blow up if it happens to force the issue.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional
information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in
compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+    private static final Logger log =
LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+    @Test
+    public void testStopPartitionedBatch() throws Exception {
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+        long executionId = jobOperator.start("partition-stop", new
Properties());
+
+        do {
+            log.info("Waiting til batch is started");
+            Thread.sleep(50);
+        }
+        while (jobOperator.getJobExecution(executionId).getBatchStatus()
!= BatchStatus.STARTED);
+
+        Thread.sleep(100);
+
+        jobOperator.stop(executionId);
+
+        BatchStatus status = Batches.waitFor(jobOperator, executionId);
+        Assert.assertEquals(status, BatchStatus.STOPPED);
+    }
+
+
+    public static class StopReader extends AbstractItemReader {
+
+        private static final int MAX_INVOCATIONS = 2;
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        private int invocations;
+
+
+        @Override
+        public Object readItem() throws Exception {
+            if (invocations++ < MAX_INVOCATIONS) {
+
+                Thread.sleep(5);
+                return invocations;
+            }
+
+            log.info("{} invoked {} times", idx, invocations);
+            return null;
+        }
+    }
+
+    public static class StopWriter extends AbstractItemWriter {
+
+        private static final Map<Integer, List<Object>> STORAGE = new
HashMap<Integer, List<Object>>(2);
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        @Override
+        public void writeItems(List<Object> items) throws Exception {
+
+            List<Object> objects = STORAGE.get(idx);
+            if (objects == null) {
+                objects = new ArrayList<Object>();
+                STORAGE.put(idx, objects);
+            }
+
+            objects.addAll(items);
+        }
+    }
+
+    public static class StopMapper implements PartitionMapper {
+
+        private static final int NUMBER_OF_PARTITIONS = 50;
+        private static final int NUMBER_OF_THREADS = 5;
+
+        @Override
+        public PartitionPlan mapPartitions() throws Exception {
+
+            Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+            for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+                Properties properties = new Properties();
+                properties.setProperty("idx", String.valueOf(i + 1));
+
+                props[i] = properties;
+            }
+
+            PartitionPlanImpl plan = new PartitionPlanImpl();
+            plan.setPartitions(NUMBER_OF_PARTITIONS);
+            plan.setThreads(NUMBER_OF_THREADS);
+            plan.setPartitionProperties(props);
+
+            return plan;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in
compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="
http://xmlns.jcp.org/xml/ns/javaee">
+    <step id="the-step">
+        <chunk item-count="10">
+            <reader
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+                <properties>
+                    <property name="idx" value="#{partitionPlan['idx']}" />
+                </properties>
+            </reader>
+            <writer
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+        </chunk>
+        <partition>
+            <mapper
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+        </partition>
+    </step>
+</job>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message