ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-2813: Improved worker batch finish/cancel handling.
Date Thu, 17 Mar 2016 08:24:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2813 e55866fd0 -> 1e1bdb092


IGNITE-2813: Improved worker batch finish/cancel handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e1bdb09
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e1bdb09
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e1bdb09

Branch: refs/heads/ignite-2813
Commit: 1e1bdb092fb0be83f2f890276151070beceaa1b6
Parents: e55866f
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Mar 17 11:24:28 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Mar 17 11:24:28 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsFileWorkerBatch.java    | 46 ++++++++++--------
 .../IgfsFileWorkerBatchCancelledException.java  | 50 ++++++++++++++++++++
 .../processors/igfs/IgfsAbstractSelfTest.java   |  2 +-
 3 files changed, 77 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1e1bdb09/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
index 130846b..433c161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -49,9 +49,6 @@ public abstract class IgfsFileWorkerBatch implements Runnable {
     /** Output stream to the file. */
     private final OutputStream out;
 
-    /** Cancel flag. */
-    private volatile boolean cancelled;
-
     /** Finishing flag. */
     private volatile boolean finishing;
 
@@ -87,34 +84,45 @@ public abstract class IgfsFileWorkerBatch implements Runnable {
 
     /**
      * Add the last task to that batch which will release all the resources.
+     *
+     * @return {@code True} if finish was signalled.
      */
-    synchronized void finish() {
-        if (!finishing) {
-            finishing = true;
-
-            queue.add(STOP_MARKER);
-        }
+    synchronized boolean finish() {
+        return finish0(STOP_MARKER);
     }
 
     /**
      * Cancel batch processing.
+     *
+     * @return {@code True} if cancel was signalled.
      */
-    synchronized void cancel() {
-        queue.addFirst(CANCEL_MARKER);
+    synchronized boolean cancel() {
+        return finish0(CANCEL_MARKER);
     }
 
     /**
-     * @return {@code True} if finish was called on this batch.
+     * Internal finish routine.
+     *
+     * @param marker Marker.
+     * @return {@code True} if marker was delivered to worker thread.
      */
-    boolean finishing() {
-        return finishing;
+    private synchronized boolean finish0(byte[] marker) {
+        if (!finishing) {
+            finishing = true;
+
+            queue.addFirst(marker);
+
+            return true;
+        }
+        else
+            return false;
     }
 
     /**
-     * @return {@code True} if batch write was terminated abruptly due to explicit cancellation.
+     * @return {@code True} if finish was called on this batch.
      */
-    boolean cancelled() {
-        return cancelled;
+    boolean finishing() {
+        return finishing;
     }
 
     /**
@@ -135,9 +143,7 @@ public abstract class IgfsFileWorkerBatch implements Runnable {
                         break;
                     }
                     else if (data == CANCEL_MARKER) {
-                        cancelled = true;
-
-                        throw new IgniteCheckedException("Write to file was cancelled due
to node stop.");
+                        throw new IgfsFileWorkerBatchCancelledException(path);
                     }
                     else if (data != null) {
                         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e1bdb09/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java
new file mode 100644
index 0000000..a4ec485
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsPath;
+
+/**
+ * Exception indicating that file batch processing was cancelled.
+ */
+public class IgfsFileWorkerBatchCancelledException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    private IgfsPath path;
+
+    /**
+     * Default constructor.
+     */
+    public IgfsFileWorkerBatchCancelledException() {
+        // No-op.
+    }
+
+    public IgfsFileWorkerBatchCancelledException(IgfsPath path) {
+        this.path = path;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getMessage() {
+                if (path == null)
+            return "Asynchronous file processing was cancelled due to node stop.";
+        else
+            return "Asynchronous file processing was cancelled due to node stop: " + path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e1bdb09/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 7e1a97a..39f6db2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -3126,7 +3126,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest
{
                 entry.getValue().await();
             }
             catch (IgniteCheckedException e) {
-                if (!entry.getValue().cancelled())
+                if (!(e instanceof IgfsFileWorkerBatchCancelledException))
                     throw e;
             }
         }


Mime
View raw message