ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [07/11] ignite git commit: IGNITE-3260: IGFS: Delete messages are no longer passed.
Date Wed, 08 Jun 2016 12:19:48 GMT
IGNITE-3260: IGFS: Delete messages are no longer passed.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c300448b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c300448b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c300448b

Branch: refs/heads/master
Commit: c300448b94ed0d3f847197d1bbe67c31165c6ae6
Parents: a60bb3b
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Mon Jun 6 18:12:42 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Jun 8 14:57:39 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsAsyncImpl.java |   6 -
 .../processors/igfs/IgfsDataManager.java        |  61 ++---
 .../processors/igfs/IgfsDeleteWorker.java       |  39 ---
 .../ignite/internal/processors/igfs/IgfsEx.java |   9 -
 .../internal/processors/igfs/IgfsImpl.java      | 249 +++++--------------
 .../internal/processors/igfs/IgfsUtils.java     |   2 +-
 .../ignite/igfs/IgfsFragmentizerSelfTest.java   |   2 -
 .../processors/igfs/IgfsSizeSelfTest.java       | 133 ----------
 .../HadoopDefaultMapReducePlannerSelfTest.java  |   6 -
 9 files changed, 83 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 8653f90..7530557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem>
impleme
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException
{
-        return igfs.awaitDeletesAsync();
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public String clientLogDirectory() {
         return igfs.clientLogDirectory();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 77fb0a9..66da05d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
 import org.apache.ignite.igfs.IgfsOutOfSpaceException;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager {
     private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int
startOff,
         byte[] data) throws IgniteCheckedException {
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-            try {
-                igfs.awaitDeletesAsync().get(trashPurgeTimeout);
-            }
-            catch (IgniteFutureTimeoutCheckedException ignore) {
-                // Ignore.
-            }
+            final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
 
-            // Additional size check.
-            if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-                final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
-
-                if (completionFut == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Missing completion future for file write request (most
likely exception occurred " +
-                            "which will be thrown upon stream close) [fileId=" + fileId +
']');
+            if (completionFut == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Missing completion future for file write request (most likely
exception occurred " +
+                        "which will be thrown upon stream close) [fileId=" + fileId + ']');
 
-                    return;
-                }
+                return;
+            }
 
-                IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write
data block " +
-                    "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed()
+
-                    ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
+            IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data
block " +
+                "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed()
+
+                ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
 
-                completionFut.onDone(new IgniteCheckedException("Failed to write data (not
enough space on node): " +
-                    igfsCtx.kernalContext().localNodeId(), e));
+            completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough
space on node): " +
+                igfsCtx.kernalContext().localNodeId(), e));
 
-                return;
-            }
+            return;
         }
 
         // No affinity key present, just concat and return.
@@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager {
         assert !blocks.isEmpty();
 
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-            try {
-                try {
-                    igfs.awaitDeletesAsync().get(trashPurgeTimeout);
-                }
-                catch (IgniteFutureTimeoutCheckedException ignore) {
-                    // Ignore.
-                }
-
-                // Additional size check.
-                if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
-                    return new GridFinishedFuture<Object>(
-                        new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum
data size " +
-                            "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
-                            ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
-
-            }
-            catch (IgniteCheckedException e) {
-                return new GridFinishedFuture<>(new IgniteCheckedException("Failed
to store data " +
-                    "block due to unexpected exception.", e));
-            }
+            return new GridFinishedFuture<Object>(
+                new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data
size " +
+                    "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+                    ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
         }
 
         return dataCachePrj.putAllAsync(blocks);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bac7569..3d41071 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -24,9 +24,7 @@ import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -51,9 +49,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** How many files/folders to delete at once (i.e in a single transaction). */
     private static final int MAX_DELETE_BATCH = 100;
 
-    /** IGFS context. */
-    private final IgfsContext igfsCtx;
-
     /** Metadata manager. */
     private final IgfsMetaManager meta;
 
@@ -75,9 +70,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** Cancellation flag. */
     private volatile boolean cancelled;
 
-    /** Message topic. */
-    private Object topic;
-
     /**
      * Constructor.
      *
@@ -86,15 +78,9 @@ public class IgfsDeleteWorker extends IgfsThread {
     IgfsDeleteWorker(IgfsContext igfsCtx) {
         super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId()
+ "%");
 
-        this.igfsCtx = igfsCtx;
-
         meta = igfsCtx.meta();
         data = igfsCtx.data();
 
-        String igfsName = igfsCtx.igfs().name();
-
-        topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
         assert meta != null;
         assert data != null;
 
@@ -191,8 +177,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                             if (log.isDebugEnabled())
                                 log.debug("Sending delete confirmation message [name=" +
entry.getKey() +
                                     ", fileId=" + fileId + ']');
-
-                            sendDeleteMessage(new IgfsDeleteMessage(fileId));
                         }
                     }
                     else
@@ -203,8 +187,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(),
e);
-
-                    sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
                 }
             }
         }
@@ -358,25 +340,4 @@ public class IgfsDeleteWorker extends IgfsThread {
                 return true; // Directory entry was deleted concurrently.
         }
     }
-
-    /**
-     * Send delete message to all meta cache nodes in the grid.
-     *
-     * @param msg Message to send.
-     */
-    private void sendDeleteMessage(IgfsDeleteMessage msg) {
-        assert msg != null;
-
-        Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
-        for (ClusterNode node : nodes) {
-            try {
-                igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id()
+
-                    ", msg=" + msg + ", err=" + e.getMessage() + ']');
-            }
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index fb67e20..4c64bc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem {
     public long groupBlockSize();
 
     /**
-     * Asynchronously await for all entries existing in trash to be removed.
-     *
-     * @return Future which will be completed when all entries existed in trash by the time
of invocation are removed.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
-
-    /**
      * Gets client file system log directory.
      *
      * @return Client file system log directory or {@code null} in case no client connections
have been created yet.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index b2b031e..4dfb040 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsInvalidPathException;
 import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -100,11 +95,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
 
 /**
  * Cache-based IGFS implementation.
@@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
     /** Default permissions for file system entry. */
     private static final String PERMISSION_DFLT_VAL = "0777";
 
+    /** Index generator for async format threads. */
+    private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
     /** Default directory metadata. */
     static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION,
PERMISSION_DFLT_VAL);
 
@@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
     /** Writers map. */
     private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new
ConcurrentHashMap8<>();
 
-    /** Delete futures. */
-    private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts
= new ConcurrentHashMap8<>();
-
-    /** Delete message listener. */
-    private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
-    /** Format discovery listener. */
-    private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
     /** Local metrics holder. */
     private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
 
     /** Client log directory. */
     private volatile String logDir;
 
-    /** Message topic. */
-    private Object topic;
-
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
@@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
-        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT,
EVT_NODE_FAILED);
-
         dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE,
5000L,
             new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()),
null) : null;
     }
@@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
         // Restore interrupted flag.
         if (interrupted)
             Thread.currentThread().interrupt();
@@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
     /** {@inheritDoc} */
     @Override public void format() {
         try {
-            formatAsync().get();
+            IgniteUuid id = meta.format();
+
+            // If ID is null, then file system is already empty.
+            if (id == null)
+                return;
+
+            while (true) {
+                if (enterBusy()) {
+                    try {
+                        if (!meta.exists(id))
+                            return;
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+
+                U.sleep(10);
+            }
         }
         catch (Exception e) {
             throw IgfsUtils.toIgfsException(e);
@@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
      * @return Future.
      */
     IgniteInternalFuture<?> formatAsync() {
-        try {
-            IgniteUuid id = meta.format();
-
-            if (id == null)
-                return new GridFinishedFuture<Object>();
-            else {
-                GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
-                GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
-                if (oldFut != null)
-                    return oldFut;
-                else {
-                    if (!meta.exists(id)) {
-                        // Safety in case response message was received before we put future
into collection.
-                        fut.onDone();
-
-                        delFuts.remove(id, fut);
-                    }
+        GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
-                    return fut;
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<Object>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException
{
-        Collection<IgniteUuid> ids = meta.pendingDeletes();
+        Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-"
+
+            FORMAT_THREAD_IDX_GEN.incrementAndGet());
 
-        if (!ids.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Constructing delete future for trash entries: " + ids);
+        t.setDaemon(true);
 
-            GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>();
+        t.start();
 
-            for (IgniteUuid id : ids) {
-                GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
-                if (oldFut != null)
-                    resFut.add(oldFut);
-                else {
-                    if (meta.exists(id))
-                        resFut.add(fut);
-                    else {
-                        fut.onDone();
-
-                        delFuts.remove(id, fut);
-                    }
-                }
-            }
-
-            resFut.markInitialized();
-
-            return resFut;
-        }
-        else
-            return new GridFinishedFuture<>();
+        return fut;
     }
 
     /**
@@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
         return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
     }
 
-    /**
-     * Check whether IGFS with the same name exists among provided attributes.
-     *
-     * @param attrs Attributes.
-     * @return {@code True} in case IGFS with the same name exists among provided attributes
-     */
-    private boolean sameIgfs(IgfsAttributes[] attrs) {
-        if (attrs != null) {
-            String igfsName = name();
-
-            for (IgfsAttributes attr : attrs) {
-                if (F.eq(igfsName, attr.igfsName()))
-                    return true;
-            }
-        }
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver
rslvr,
         Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
         }
     }
 
-    /**
-     * Format message listener required for format action completion.
-     */
-    private class FormatMessageListener implements GridMessageListener {
-        /** {@inheritDoc} */
-        @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-        @Override public void onMessage(UUID nodeId, Object msg) {
-            if (msg instanceof IgfsDeleteMessage) {
-                ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
-                if (node != null) {
-                    if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
-                        IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
-                        try {
-                            msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(),
null);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to unmarshal message (will ignore): " +
msg0, e);
-
-                            return;
-                        }
-
-                        assert msg0.id() != null;
-
-                        GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
-                        if (fut != null) {
-                            if (msg0.error() == null)
-                                fut.onDone();
-                            else
-                                fut.onDone(msg0.error());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Discovery listener required for format actions completion.
-     */
-    private class FormatDiscoveryListener implements GridLocalEventListener {
-        /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
-            DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
-            if (evt0.eventNode() != null) {
-                if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
-                    Collection<IgniteUuid> rmv = new HashSet<>();
-
-                    for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut
: delFuts.entrySet()) {
-                        IgniteUuid id = fut.getKey();
-
-                        try {
-                            if (!meta.exists(id)) {
-                                fut.getValue().onDone();
-
-                                rmv.add(id);
-                            }
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to check file existence: " + id, e);
-                        }
-                    }
-
-                    for (IgniteUuid id : rmv)
-                        delFuts.remove(id);
-                }
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid nextAffinityKey() {
         return safeOp(new Callable<IgniteUuid>() {
@@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
             return t;
         }
     }
+
+    /**
+     * Format runnable.
+     */
+    private class FormatRunnable implements Runnable {
+        /** Target future. */
+        private final GridFutureAdapter<?> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param fut Future.
+         */
+        public FormatRunnable(GridFutureAdapter<?> fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgfsException err = null;
+
+            try {
+                format();
+            }
+            catch (Throwable err0) {
+                err = IgfsUtils.toIgfsException(err0);
+            }
+            finally {
+                if (err == null)
+                    fut.onDone();
+                else
+                    fut.onDone(err);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index d7aae5c..7310b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -185,7 +185,7 @@ public class IgfsUtils {
      * @return Converted IGFS exception.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public static IgfsException toIgfsException(Exception err) {
+    public static IgfsException toIgfsException(Throwable err) {
         IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
 
         IgfsException igfsErr = X.cause(err, IgfsException.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index fd4ec17..4e0f12b 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest
{
 
         igfs.format();
 
-        igfs.awaitDeletesAsync().get();
-
         GridTestUtils.retryAssert(log, 50, 100, new CA() {
             @Override public void apply() {
                 for (int i = 0; i < NODE_CNT; i++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index 3933e86..266945f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.Transaction;
 import org.jsr166.ThreadLocalRandom8;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  * {@link IgfsAttributes} test case.
@@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
-     * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data
is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPartitionedOversizeDelay() throws Exception {
-        cacheMode = PARTITIONED;
-        nearEnabled = true;
-
-        checkOversizeDelay();
-    }
-
-    /**
-     * Ensure that exception is not thrown in case co-located cache is oversized, but data
is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testColocatedOversizeDelay() throws Exception {
-        cacheMode = PARTITIONED;
-        nearEnabled = false;
-
-        checkOversizeDelay();
-    }
-
-    /**
-     * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data
is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testReplicatedOversizeDelay() throws Exception {
-        cacheMode = REPLICATED;
-
-        checkOversizeDelay();
-    }
-
-    /**
      * Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache.
      *
      * @throws Exception If failed.
@@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
-     * Ensure that exception is not thrown or thrown with some delay when there is something
in trash directory.
-     *
-     * @throws Exception If failed.
-     */
-    private void checkOversizeDelay() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        igfsMaxData = 256;
-        trashPurgeTimeout = 2000;
-
-        startUp();
-
-        IgfsImpl igfs = igfs(0);
-
-        final IgfsPath path = new IgfsPath("/file");
-        final IgfsPath otherPath = new IgfsPath("/fileOther");
-
-        // Fill cache with data up to it's limit.
-        IgfsOutputStream os = igfs.create(path, false);
-        os.write(chunk((int)igfsMaxData));
-        os.close();
-
-        final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache(
-            igfs.configuration().getMetaCacheName());
-
-        // Start a transaction in a separate thread which will lock file ID.
-        final IgniteUuid id = igfs.context().meta().fileId(path);
-        final IgfsEntryInfo info = igfs.context().meta().info(id);
-
-        final AtomicReference<Throwable> err = new AtomicReference<>();
-
-        try {
-            new Thread(new Runnable() {
-                @Override public void run() {
-                    try {
-
-                        try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC,
REPEATABLE_READ)) {
-                            metaCache.get(id);
-
-                            latch.await();
-
-                            U.sleep(1000); // Sleep here so that data manager could "see"
oversize.
-
-                            tx.commit();
-                        }
-                    }
-                    catch (Throwable e) {
-                        err.set(e);
-                    }
-                }
-            }).start();
-
-            // Now add file ID to trash listing so that delete worker could "see" it.
-            IgniteUuid trashId = IgfsUtils.randomTrashId();
-
-            try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC,
REPEATABLE_READ)) {
-                Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
-                    new IgfsListingEntry(info));
-
-                // Clear root listing.
-                metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID));
-
-                // Add file to trash listing.
-                IgfsEntryInfo trashInfo = metaCache.get(trashId);
-
-                if (trashInfo == null)
-                    metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing));
-                else
-                    metaCache.put(trashId, trashInfo.listing(listing));
-
-                tx.commit();
-            }
-
-            assert metaCache.get(trashId) != null;
-
-            // Now the file is locked and is located in trash, try adding some more data.
-            os = igfs.create(otherPath, false);
-            os.write(new byte[1]);
-
-            latch.countDown();
-
-            os.close();
-
-            assert err.get() == null;
-        }
-        finally {
-            latch.countDown(); // Safety.
-        }
-    }
-
-    /**
      * Ensure that IGFS size is correctly updated in case of preloading.
      *
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index b38f3a2..ffa6f7d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException
{
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Nullable @Override public String clientLogDirectory() {
             return null;
         }


Mime
View raw message