ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject incubator-ignite git commit: # IGNITE-625: Fixed.
Date Fri, 10 Apr 2015 10:49:22 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-3 217883c4f -> 5be8db77e


# IGNITE-625: Fixed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5be8db77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5be8db77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5be8db77

Branch: refs/heads/ignite-sprint-3
Commit: 5be8db77e0e60c65c40b3bb3c86688e7c8f0d6ba
Parents: 217883c
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Apr 10 13:49:31 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Apr 10 13:49:31 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsAsyncImpl.java |   4 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |   4 +-
 .../processors/igfs/IgfsFileWorker.java         | 180 ---------------
 .../processors/igfs/IgfsFileWorkerBatch.java    | 225 ++++++++-----------
 .../processors/igfs/IgfsFileWorkerTask.java     |  32 ---
 .../internal/processors/igfs/IgfsImpl.java      | 150 ++++++++-----
 .../internal/processors/igfs/IgfsProcessor.java |   2 +-
 .../processors/igfs/IgfsAbstractSelfTest.java   |  16 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |   2 +-
 9 files changed, 200 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 48a32f4..8099985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -102,8 +102,8 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem>
impleme
     }
 
     /** {@inheritDoc} */
-    @Override public void stop() {
-        igfs.stop();
+    @Override public void stop(boolean cancel) {
+        igfs.stop(cancel);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 99f647e..7c1a837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -53,8 +53,10 @@ public interface IgfsEx extends IgniteFileSystem {
 
     /**
      * Stops IGFS cleaning all used resources.
+     *
+     * @param cancel Cancellation flag.
      */
-    public void stop();
+    public void stop(boolean cancel);
 
     /**
      * @return IGFS context.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
deleted file mode 100644
index 8b04c41..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorker.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-/**
- * IGFS file worker for DUAL modes.
- */
-public class IgfsFileWorker extends IgfsThread {
-    /** Time during which thread remains alive since it's last batch is finished. */
-    private static final long THREAD_REUSE_WAIT_TIME = 5000;
-
-    /** Lock */
-    private final Lock lock = new ReentrantLock();
-
-    /** Condition. */
-    private final Condition cond = lock.newCondition();
-
-    /** Next queued batch. */
-    private IgfsFileWorkerBatch nextBatch;
-
-    /** Batch which is currently being processed. */
-    private IgfsFileWorkerBatch curBatch;
-
-    /** Cancellation flag. */
-    private volatile boolean cancelled;
-
-    /**
-     * Creates {@code IGFS} file worker.
-     *
-     * @param name Worker name.
-     */
-    IgfsFileWorker(String name) {
-        super(name);
-    }
-
-    /**
-     * Add worker batch.
-     *
-     * @return {@code True} if the batch was actually added.
-     */
-    boolean addBatch(IgfsFileWorkerBatch batch) {
-        assert batch != null;
-
-        lock.lock();
-
-        try {
-            if (!cancelled) {
-                assert nextBatch == null; // Remember, that write operations on a single
file are exclusive.
-
-                nextBatch = batch;
-
-                cond.signalAll();
-
-                return true;
-            }
-            else
-                return false;
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException {
-        while (!cancelled) {
-            lock.lock();
-
-            try {
-                // If there are no more new batches, wait for several seconds before shutting
down the thread.
-                if (!cancelled && nextBatch == null)
-                    cond.await(THREAD_REUSE_WAIT_TIME, TimeUnit.MILLISECONDS);
-
-                curBatch = nextBatch;
-
-                nextBatch = null;
-
-                if (cancelled && curBatch != null)
-                    curBatch.finish(); // Mark the batch as finished if cancelled.
-            }
-            finally {
-                lock.unlock();
-            }
-
-            if (curBatch != null)
-                curBatch.process();
-            else {
-                lock.lock();
-
-                try {
-                    // No more new batches, we can safely release the worker as it was inactive
for too long.
-                    if (nextBatch == null)
-                        cancelled = true;
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void cleanup() {
-        // Clear interrupted flag.
-        boolean interrupted = interrupted();
-
-        // Process the last batch if any.
-        if (nextBatch != null)
-            nextBatch.process();
-
-        onFinish();
-
-        // Reset interrupted flag.
-        if (interrupted)
-            interrupt();
-    }
-
-    /**
-     * Forcefully finish execution of all batches.
-     */
-    void cancel() {
-        lock.lock();
-
-        try {
-            cancelled = true;
-
-            if (curBatch != null)
-                curBatch.finish();
-
-            if (nextBatch != null)
-                nextBatch.finish();
-
-            cond.signalAll(); // Awake the main loop in case it is still waiting for the
next batch.
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Get current batch.
-     *
-     * @return Current batch.
-     */
-    IgfsFileWorkerBatch currentBatch() {
-        lock.lock();
-
-        try {
-            return nextBatch == null ? curBatch : nextBatch;
-        }
-        finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Callback invoked when worker has processed all it's batches.
-     */
-    protected void onFinish() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
index 27f9b7d..5b92c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java
@@ -19,29 +19,27 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
 
 /**
  * Work batch is an abstraction of the logically grouped tasks.
  */
-public class IgfsFileWorkerBatch {
-    /** Completion latch. */
-    private final CountDownLatch completeLatch = new CountDownLatch(1);
+public abstract class IgfsFileWorkerBatch implements Runnable {
+    /** Stop marker. */
+    private static final byte[] STOP_MARKER = new byte[0];
 
-    /** Finish guard. */
-    private final AtomicBoolean finishGuard = new AtomicBoolean();
-
-    /** Lock for finish operation. */
-    private final ReadWriteLock finishLock = new ReentrantReadWriteLock();
+    /** Cancel marker. */
+    private static final byte[] CANCEL_MARKER = new byte[0];
 
     /** Tasks queue. */
-    private final BlockingDeque<IgfsFileWorkerTask> queue = new LinkedBlockingDeque<>();
+    private final BlockingDeque<byte[]> queue = new LinkedBlockingDeque<>();
+
+    /** Future which completes when batch processing is finished. */
+    private final GridFutureAdapter fut = new GridFutureAdapter();
 
     /** Path to the file in the primary file system. */
     private final IgfsPath path;
@@ -49,11 +47,8 @@ public class IgfsFileWorkerBatch {
     /** Output stream to the file. */
     private final OutputStream out;
 
-    /** Caught exception. */
-    private volatile IgniteCheckedException err;
-
-    /** Last task marker. */
-    private boolean lastTask;
+    /** Finishing flag. */
+    private volatile boolean finishing;
 
     /**
      * Constructor.
@@ -70,88 +65,106 @@ public class IgfsFileWorkerBatch {
     }
 
     /**
-     * Perform write.
+     * Perform write if batch is not finishing yet.
      *
      * @param data Data to be written.
-     * @return {@code True} in case operation was enqueued.
+     * @return {@code True} in case write was enqueued.
      */
-    boolean write(final byte[] data) {
-        return addTask(new IgfsFileWorkerTask() {
-            @Override public void execute() throws IgniteCheckedException {
-                try {
-                    out.write(data);
-                }
-                catch (IOException e) {
-                    throw new IgniteCheckedException("Failed to write data to the file due
to secondary file system " +
-                        "exception: " + path, e);
-                }
-            }
-        });
+    synchronized boolean write(final byte[] data) {
+        if (!finishing) {
+            queue.add(data);
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Add the last task to that batch which will release all the resources.
+     */
+    synchronized void finish() {
+        if (!finishing) {
+            finishing = true;
+
+            queue.add(STOP_MARKER);
+        }
+    }
+
+    /**
+     * Cancel batch processing.
+     */
+    synchronized void cancel() {
+        queue.addFirst(CANCEL_MARKER);
+    }
+
+    /**
+     * @return {@code True} if finish was called on this batch.
+     */
+    boolean finishing() {
+        return finishing;
     }
 
     /**
      * Process the batch.
      */
-    void process() {
-        try {
-            boolean cancelled = false;
+    @SuppressWarnings("unchecked")
+    public void run() {
+        Throwable err = null;
 
-            while (!cancelled) {
+        try {
+            while (true) {
                 try {
-                    IgfsFileWorkerTask task = queue.poll(1000, TimeUnit.MILLISECONDS);
-
-                    if (task == null)
-                        continue;
+                    byte[] data = queue.poll(1000, TimeUnit.MILLISECONDS);
 
-                    task.execute();
+                    if (data == STOP_MARKER) {
+                        assert queue.isEmpty();
 
-                    if (lastTask)
-                        cancelled = true;
+                        break;
+                    }
+                    else if (data == CANCEL_MARKER)
+                        throw new IgniteCheckedException("Write to file was cancelled due
to node stop.");
+                    else if (data != null) {
+                        try {
+                            out.write(data);
+                        }
+                        catch (IOException e) {
+                            throw new IgniteCheckedException("Failed to write data to the
file due to secondary " +
+                                "file system exception: " + path, e);
+                        }
+                    }
                 }
-                catch (IgniteCheckedException e) {
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
                     err = e;
 
-                    cancelled = true;
+                    break;
                 }
-                catch (InterruptedException ignore) {
-                    Thread.currentThread().interrupt();
+                catch (Exception e) {
+                    err = e;
 
-                    cancelled = true;
+                    break;
                 }
             }
         }
+        catch (Throwable e) {
+            // Safety. This should never happen under normal conditions.
+            err = e;
+        }
         finally {
-            try {
-                onComplete();
-            }
-            finally {
-                U.closeQuiet(out);
+            // Order of events is very important here. First, we close the stream so that
metadata locks are released.
+            // This action must be the very first because otherwise a writer thread could
interfere with itself.
+            U.closeQuiet(out);
 
-                completeLatch.countDown();
-            }
-        }
-    }
+            // Next, we invoke callback so that IgfsImpl is able to enqueue new requests.
This is safe because
+            // at this point file processing is completed.
+            onDone();
 
-    /**
-     * Add the last task to that batch which will release all the resources.
-     */
-    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    void finish() {
-        if (finishGuard.compareAndSet(false, true)) {
-            finishLock.writeLock().lock();
-
-            try {
-                queue.add(new IgfsFileWorkerTask() {
-                    @Override public void execute() {
-                        assert queue.isEmpty();
+            // Finally, we complete the future, so that waiting threads could resume.
+            assert !fut.isDone();
 
-                        lastTask = true;
-                    }
-                });
-            }
-            finally {
-                finishLock.writeLock().unlock();
-            }
+            fut.onDone(null, err);
         }
     }
 
@@ -161,29 +174,7 @@ public class IgfsFileWorkerBatch {
      * @throws IgniteCheckedException In case any exception has occurred during batch tasks
processing.
      */
     void await() throws IgniteCheckedException {
-        try {
-            completeLatch.await();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-
-        IgniteCheckedException err0 = err;
-
-        if (err0 != null)
-            throw err0;
-    }
-
-    /**
-     * Await for that worker batch to complete in case it was marked as finished.
-     *
-     * @throws IgniteCheckedException In case any exception has occurred during batch tasks
processing.
-     */
-    void awaitIfFinished() throws IgniteCheckedException {
-        if (finishGuard.get())
-            await();
+        fut.get();
     }
 
     /**
@@ -196,41 +187,7 @@ public class IgfsFileWorkerBatch {
     }
 
     /**
-     * Callback invoked when all the tasks within the batch are completed.
+     * Callback invoked when execution finishes.
      */
-    protected void onComplete() {
-        // No-op.
-    }
-
-    /**
-     * Add task to the queue.
-     *
-     * @param task Task to add.
-     * @return {@code True} in case the task was added to the queue.
-     */
-    private boolean addTask(IgfsFileWorkerTask task) {
-        finishLock.readLock().lock();
-
-        try {
-            if (!finishGuard.get()) {
-                try {
-                    queue.put(task);
-
-                    return true;
-                }
-                catch (InterruptedException ignore) {
-                    // Task was not enqueued due to interruption.
-                    Thread.currentThread().interrupt();
-
-                    return false;
-                }
-            }
-            else
-                return false;
-
-        }
-        finally {
-            finishLock.readLock().unlock();
-        }
-    }
+    protected abstract void onDone();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
deleted file mode 100644
index ba788b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerTask.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.*;
-
-/**
- * Generic IGFS worker task which could potentially throw an exception.
- */
-public interface IgfsFileWorkerTask {
-    /**
-     * Execute task logic.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void execute() throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 824f178..34636d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
+import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -97,7 +98,7 @@ public final class IgfsImpl implements IgfsEx {
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** Writers map. */
-    private final ConcurrentHashMap8<IgfsPath, IgfsFileWorker> workerMap = new ConcurrentHashMap8<>();
+    private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new
ConcurrentHashMap8<>();
 
     /** Delete futures. */
     private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts
= new ConcurrentHashMap8<>();
@@ -120,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
+    /** Pool for threads working in DUAL mode. */
+    private final IgniteThreadPoolExecutor dualPool;
+
     /**
      * Creates IGFS instance with given context.
      *
@@ -214,6 +218,9 @@ public final class IgfsImpl implements IgfsEx {
 
         igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
         igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT,
EVT_NODE_FAILED);
+
+        dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE,
5000L,
+            new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()),
null) : null;
     }
 
     /**
@@ -227,34 +234,25 @@ public final class IgfsImpl implements IgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public void stop() {
+    @Override public void stop(boolean cancel) {
         busyLock.block();
 
         // Clear interrupted flag temporarily.
         boolean interrupted = Thread.interrupted();
 
-        // Force all workers to finish their batches.
-        for (IgfsFileWorker w : workerMap.values())
-            w.cancel();
+        if (secondaryFs != null) {
+            // Force all workers to finish their batches.
+            for (IgfsFileWorkerBatch batch : workerMap.values())
+                batch.cancel();
 
-        // Wait for all writers to finish their execution.
-        for (IgfsFileWorker w : workerMap.values()) {
-            try {
-                w.join();
-            }
-            catch (InterruptedException e) {
-                U.error(log, e.getMessage(), e);
-            }
+            if (secondaryFs instanceof AutoCloseable)
+                U.closeQuiet((AutoCloseable)secondaryFs);
         }
 
-        workerMap.clear();
-
-        if (secondaryFs instanceof AutoCloseable)
-            U.closeQuiet((AutoCloseable)secondaryFs);
-
         igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
         igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
 
+        // Restore interrupted flag.
         if (interrupted)
             Thread.currentThread().interrupt();
     }
@@ -273,33 +271,29 @@ public final class IgfsImpl implements IgfsEx {
 
         if (enterBusy()) {
             try {
-                IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out);
-
-                while (true) {
-                    IgfsFileWorker worker = workerMap.get(path);
-
-                    if (worker != null) {
-                        if (worker.addBatch(batch)) // Added batch to active worker.
-                            break;
-                        else
-                            workerMap.remove(path, worker); // Worker is stopping. Remove
it from map.
+                // Create new batch.
+                IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out) {
+                    @Override protected void onDone() {
+                        workerMap.remove(path, this);
                     }
-                    else {
-                        worker = new IgfsFileWorker("igfs-file-worker-" + path) {
-                            @Override protected void onFinish() {
-                                workerMap.remove(path, this);
-                            }
-                        };
+                };
 
-                        boolean b = worker.addBatch(batch);
+                // Submit it to the thread pool immediately.
+                assert dualPool != null;
 
-                        assert b;
+                dualPool.submit(batch);
 
-                        if (workerMap.putIfAbsent(path, worker) == null) {
-                            worker.start();
+                // Spin in case another batch is currently running.
+                while (true) {
+                    IgfsFileWorkerBatch prevBatch = workerMap.putIfAbsent(path, batch);
 
-                            break;
-                        }
+                    if (prevBatch == null)
+                        break;
+                    else {
+                        assert prevBatch.finishing() :
+                            "File lock should prevent stream creation on a not-closed-yet
file.";
+
+                        prevBatch.await();
                     }
                 }
 
@@ -333,7 +327,7 @@ public final class IgfsImpl implements IgfsEx {
     void await(IgfsPath... paths) {
         assert paths != null;
 
-        for (Map.Entry<IgfsPath, IgfsFileWorker> workerEntry : workerMap.entrySet())
{
+        for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> workerEntry : workerMap.entrySet())
{
             IgfsPath workerPath = workerEntry.getKey();
 
             boolean await = false;
@@ -347,11 +341,21 @@ public final class IgfsImpl implements IgfsEx {
             }
 
             if (await) {
-                IgfsFileWorkerBatch batch = workerEntry.getValue().currentBatch();
+                IgfsFileWorkerBatch batch = workerEntry.getValue();
 
                 if (batch != null) {
                     try {
-                        batch.awaitIfFinished();
+                        // We wait only on files which had been closed, but their async writes
are still in progress.
+                        // This check is racy if several threads are modifying file system
concurrently, but we are ok
+                        // with that. The sole purpose of this waiting is to ensure happens-before
semantics for a
+                        // single thread. E.g., we have thread A working with Hadoop file
system in another process.
+                        // This file system communicates with a node and actual processing
occurs in threads B and C
+                        // of the current process. What we need to ensure is that if thread
A called "close" then
+                        // subsequent operations of this thread "see" this close and wait
for async writes to finish.
+                        // And as we do not on which paths thread A performed writes earlier,
we have to wait for all
+                        // batches on current path and all it's known children.
+                        if (batch.finishing())
+                            batch.await();
                     }
                     catch (IgniteCheckedException ignore) {
                         // No-op.
@@ -402,7 +406,8 @@ public final class IgfsImpl implements IgfsEx {
     @SuppressWarnings("ConstantConditions")
     @Override public IgfsStatus globalSpace() {
         return safeOp(new Callable<IgfsStatus>() {
-            @Override public IgfsStatus call() throws Exception {
+            @Override
+            public IgfsStatus call() throws Exception {
                 IgniteBiTuple<Long, Long> space = igfsCtx.kernalContext().grid().compute().execute(
                     new IgfsGlobalSpaceTask(name()), null);
 
@@ -910,7 +915,8 @@ public final class IgfsImpl implements IgfsEx {
         A.notNull(path, "path");
 
         return safeOp(new Callable<Collection<IgfsFile>>() {
-            @Override public Collection<IgfsFile> call() throws Exception {
+            @Override
+            public Collection<IgfsFile> call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("List directory details: " + path);
 
@@ -952,8 +958,7 @@ public final class IgfsImpl implements IgfsEx {
                             files.add(new IgfsFileImpl(p, e.getValue(), data.groupBlockSize()));
                         }
                     }
-                }
-                else if (mode == PRIMARY) {
+                } else if (mode == PRIMARY) {
                     checkConflictWithPrimary(path);
 
                     throw new IgfsPathNotFoundException("Failed to list files (path not found):
" + path);
@@ -987,7 +992,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0");
 
         return safeOp(new Callable<IgfsInputStreamAdapter>() {
-            @Override public IgfsInputStreamAdapter call() throws Exception {
+            @Override
+            public IgfsInputStreamAdapter call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize
+ ']');
 
@@ -1074,7 +1080,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(bufSize >= 0, "bufSize >= 0");
 
         return safeOp(new Callable<IgfsOutputStream>() {
-            @Override public IgfsOutputStream call() throws Exception {
+            @Override
+            public IgfsOutputStream call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize
+ ", overwrite=" +
                         overwrite + ", props=" + props + ']');
@@ -1089,7 +1096,7 @@ public final class IgfsImpl implements IgfsEx {
                     await(path);
 
                     IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs,
path, simpleCreate,
-                        props, overwrite, bufSize, (short)replication, groupBlockSize(),
affKey);
+                        props, overwrite, bufSize, (short) replication, groupBlockSize(),
affKey);
 
                     batch = newBatch(path, desc.out());
 
@@ -1177,7 +1184,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(bufSize >= 0, "bufSize >= 0");
 
         return safeOp(new Callable<IgfsOutputStream>() {
-            @Override public IgfsOutputStream call() throws Exception {
+            @Override
+            public IgfsOutputStream call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Open file for appending [path=" + path + ", bufSize=" + bufSize
+ ", create=" + create +
                         ", props=" + props + ']');
@@ -1299,7 +1307,8 @@ public final class IgfsImpl implements IgfsEx {
         A.ensure(len >= 0, "len >= 0");
 
         return safeOp(new Callable<Collection<IgfsBlockLocation>>() {
-            @Override public Collection<IgfsBlockLocation> call() throws Exception
{
+            @Override
+            public Collection<IgfsBlockLocation> call() throws Exception {
                 if (log.isDebugEnabled())
                     log.debug("Get affinity for file block [path=" + path + ", start=" +
start + ", len=" + len + ']');
 
@@ -1332,7 +1341,8 @@ public final class IgfsImpl implements IgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsMetrics metrics() {
         return safeOp(new Callable<IgfsMetrics>() {
-            @Override public IgfsMetrics call() throws Exception {
+            @Override
+            public IgfsMetrics call() throws Exception {
                 IgfsPathSummary sum = new IgfsPathSummary();
 
                 summary0(ROOT_ID, sum);
@@ -1342,8 +1352,7 @@ public final class IgfsImpl implements IgfsEx {
                 if (secondaryFs != null) {
                     try {
                         secondarySpaceSize = secondaryFs.usedSpaceSize();
-                    }
-                    catch (IgniteException e) {
+                    } catch (IgniteException e) {
                         LT.warn(log, e, "Failed to get secondary file system consumed space
size.");
 
                         secondarySpaceSize = -1;
@@ -2096,4 +2105,35 @@ public final class IgfsImpl implements IgfsEx {
         else
             throw new IllegalStateException("Failed to perform IGFS action because grid is
stopping.");
     }
+
+    /**
+     * IGFS thread factory.
+     */
+    @SuppressWarnings("NullableProblems")
+    private static class IgfsThreadFactory implements ThreadFactory {
+        /** IGFS name. */
+        private final String name;
+
+        /** Counter. */
+        private final AtomicLong ctr = new AtomicLong();
+
+        /**
+         * Constructor.
+         *
+         * @param name IGFS name.
+         */
+        private IgfsThreadFactory(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Thread newThread(Runnable r) {
+            Thread t = new Thread(r);
+
+            t.setName("igfs-<" + name + ">-batch-worker-thread-" + ctr.incrementAndGet());
+            t.setDaemon(true);
+
+            return t;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index b17626c..af41ec4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -178,7 +178,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
                 mgr.stop(cancel);
             }
 
-            igfsCtx.igfs().stop();
+            igfsCtx.igfs().stop(cancel);
         }
 
         igfsCache.clear();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 448dccf..a8a8957 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -1526,7 +1526,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest
{
 
         os.write(chunk);
 
-        igfs.stop();
+        igfs.stop(true);
 
         // Reset test state.
         afterTestsStopped();
@@ -2593,12 +2593,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest
{
         workerMapFld.setAccessible(true);
 
         // Wait for all workers to finish.
-        Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, IgfsFileWorker>)workerMapFld.get(igfs);
+        Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, IgfsFileWorkerBatch>)workerMapFld.get(igfs);
 
-        for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet()) {
+        for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : workerMap.entrySet())
{
             entry.getValue().cancel();
-
-            U.join(entry.getValue());
+            entry.getValue().await();
         }
 
         // Clear igfs.
@@ -2621,12 +2620,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest
{
             workerMapFld.setAccessible(true);
 
             // Wait for all workers to finish.
-            Map<IgfsPath, IgfsFileWorker> workerMap = (Map<IgfsPath, IgfsFileWorker>)workerMapFld.get(igfsEx);
+            Map<IgfsPath, IgfsFileWorkerBatch> workerMap = (Map<IgfsPath, IgfsFileWorkerBatch>)workerMapFld.get(igfs);
 
-            for (Map.Entry<IgfsPath, IgfsFileWorker> entry : workerMap.entrySet())
{
+            for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> entry : workerMap.entrySet())
{
                 entry.getValue().cancel();
-
-                U.join(entry.getValue());
+                entry.getValue().await();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5be8db77/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 1ff8a0f..0828d0b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -675,7 +675,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         }
 
         /** {@inheritDoc} */
-        @Override public void stop() {
+        @Override public void stop(boolean cancel) {
             // No-op.
         }
 


Mime
View raw message