ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/2] ignite git commit: Finished create.
Date Fri, 18 Mar 2016 10:54:18 GMT
Finished create.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7830cd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7830cd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7830cd8

Branch: refs/heads/ignite-igfs-refactoring
Commit: e7830cd80007fdcfcf3de634db1ef7f295bfc770
Parents: b9a1498
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Mar 18 13:54:10 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Mar 18 13:54:10 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 287 ++++++++++++-------
 .../processors/igfs/IgfsAbstractSelfTest.java   |  14 +-
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |  10 +-
 3 files changed, 196 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7830cd8/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 1f2647b..d930bd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -3200,6 +3200,14 @@ public class IgfsMetaManager extends IgfsManager {
         private IgfsListingEntry entry;
 
         /**
+         * Empty constructor required for {@link Externalizable}.
+         *
+         */
+        public ListingAddProcessor() {
+            // No-op.
+        }
+
+        /**
          * Constructs update directory listing closure.
          *
          * @param fileName File name to add into parent listing.
@@ -3213,19 +3221,10 @@ public class IgfsMetaManager extends IgfsManager {
             this.entry = entry;
         }
 
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         *
-         */
-        public ListingAddProcessor() {
-            // No-op.
-        }
-
         /** {@inheritDoc} */
         @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object...
args) {
             IgfsFileInfo fileInfo = e.getValue();
 
-            assert fileInfo != null : "File info not found for the child: " + entry.fileId();
             assert fileInfo.isDirectory();
 
             Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
@@ -3262,6 +3261,70 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
+     * Listing replace processor.
+     */
+    private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid,
IgfsFileInfo, Void>,
+        Externalizable {
+        /** Name. */
+        private String name;
+
+        /** New ID. */
+        private IgniteUuid id;
+
+        /**
+         * Constructor.
+         */
+        public ListingReplaceProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param name Name.
+         * @param id ID.
+         */
+        public ListingReplaceProcessor(String name, IgniteUuid id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object...
args)
+            throws EntryProcessorException {
+            IgfsFileInfo fileInfo = e.getValue();
+
+            assert fileInfo.isDirectory();
+
+            Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+            // Modify listing in-place.
+            IgfsListingEntry oldEntry = listing.get(name);
+
+            if (oldEntry == null)
+                throw new IgniteException("Directory listing doesn't contain expected entry:
" + name);
+
+            listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory()));
+
+            e.setValue(new IgfsFileInfo(listing, fileInfo));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, name);
+            out.writeObject(id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            name = U.readString(in);
+            id = (IgniteUuid)in.readObject();
+        }
+    }
+
+    /**
      * Update path closure.
      */
     @GridInternal
@@ -3385,10 +3448,7 @@ public class IgfsMetaManager extends IgfsManager {
                             return t2;
                         }
                         else {
-                            // Need to create directory structure first.
-                            assert create;
-
-                            // This is our starting point.
+                            // Need to create directory structure first. This is our starting
point:
                             int lastExistingIdx = pathIds.lastExistingIndex();
                             IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
 
@@ -3497,130 +3557,162 @@ public class IgfsMetaManager extends IgfsManager {
         @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
         validTxState(false);
 
-        assert path != null;
+        while (true) {
+            if (busyLock.enterBusy()) {
+                try {
+                    // Prepare path IDs.
+                    IgfsPathIds pathIds = pathIds(path);
 
-        final String name = path.name();
+                    // Prepare lock IDs.
+                    Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-        DirectoryChainBuilder b = null;
+                    pathIds.addExistingIds(lockIds);
+                    pathIds.addSurrogateIds(lockIds);
 
-        IgniteUuid trashId = IgfsUtils.randomTrashId();
+                    // In overwrite mode we also lock ID of potential replacement as well
as trash ID.
+                    IgniteUuid overwriteId = IgniteUuid.randomUuid();
+                    IgniteUuid trashId = IgfsUtils.randomTrashId();
 
-        while (true) {
-            if (busyLock.enterBusy()) {
-                try {
-                    b = new DirectoryChainBuilder(path, dirProps, fileProps, blockSize, affKey,
evictExclude);
+                    if (overwrite) {
+                        lockIds.add(overwriteId);
+
+                        // Trash ID is only added if we suspect conflict.
+                        if (pathIds.allPartsFound())
+                            lockIds.add(trashId);
+                    }
 
-                    // Start Tx:
+                    // Start TX.
                     IgniteInternalTx tx = startTx();
 
                     try {
-                        if (overwrite)
-                            // Lock also the TRASH directory because in case of overwrite
we
-                            // may need to delete the old file:
-                            b.idSet.add(trashId);
+                        Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
 
-                        final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
+                        if (!pathIds.verifyIntergrity(lockInfos))
+                            // Directory structure changed concurrently. So we simply re-try.
+                            continue;
 
-                        assert !overwrite || lockedInfos.get(trashId) != null; // TRASH must
exist at this point.
+                        if (pathIds.allPartsFound()) {
+                            // All participants found.
+                            IgfsFileInfo oldInfo = lockInfos.get(pathIds.lastId());
 
-                        // If the path was changed, we close the current Tx and repeat the
procedure again
-                        // starting from taking the path ids.
-                        if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) {
-                            // Locked path okay, trying to proceed with the remainder creation.
-                            final IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId);
+                            // Check: is it a file?
+                            if (!oldInfo.isFile())
+                                throw new IgfsPathIsDirectoryException("Failed to create
a file: " + path);
 
-                            if (b.existingIdCnt == b.components.size() + 1) {
-                                // Full requestd path exists.
+                            // Check: can we overwrite it?
+                            if (!overwrite)
+                                throw new IgfsPathAlreadyExistsException("Failed to create
a file: " + path);
 
-                                assert b.existingPath.equals(path);
-                                assert lockedInfos.size() ==
-                                        (overwrite ? b.existingIdCnt + 1/*TRASH*/ : b.existingIdCnt);
+                            // Check if file already opened for write.
+                            if (oldInfo.lockId() != null)
+                                throw new IgfsException("File is already opened for write:
" + path);
 
-                                if (lowermostExistingInfo.isDirectory()) {
-                                    throw new IgfsPathAlreadyExistsException("Failed to create
file (path points to an " +
-                                        "existing directory): " + path);
-                                }
-                                else {
-                                    // This is a file.
-                                    assert lowermostExistingInfo.isFile();
+                            // At this point file can be re-created safely.
 
-                                    final IgniteUuid parentId = b.idList.get(b.idList.size()
- 2);
+                            // First step: add existing to trash listing.
+                            IgniteUuid oldId = pathIds.lastId();
 
-                                    final IgniteUuid lockId = lowermostExistingInfo.lockId();
+                            id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(),
+                                new IgfsListingEntry(oldId, true)));
 
-                                    if (overwrite) {
-                                        // Delete existing file, but fail if it is locked:
-                                        if (lockId != null)
-                                            throw fsException("Failed to overwrite file (file
is opened for writing) " +
-                                                    "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
-                                                    + ", lockId=" + lockId + ']');
+                            // Second step: replace ID in parent directory.
+                            String name = pathIds.lastPart();
+                            IgniteUuid parentId = pathIds.lastParentId();
 
-                                        final IgfsListingEntry deletedEntry = lockedInfos.get(parentId).listing()
-                                                .get(name);
+                            id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name,
overwriteId));
 
-                                        assert deletedEntry != null;
+                            // Third step: create the file.
+                            long createTime = System.currentTimeMillis();
 
-                                        transferEntry(deletedEntry, parentId, name, trashId,
-                                            lowermostExistingInfo.id().toString());
+                            IgfsFileInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
+                                fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
 
-                                        // Update a file info of the removed file with a
file path,
-                                        // which will be used by delete worker for event
notifications.
-                                        invokeUpdatePath(lowermostExistingInfo.id(), path);
+                            // Fourth step: update path of remove file.
+                            invokeUpdatePath(oldId, path);
 
-                                        // Make a new locked info:
-                                        long t = System.currentTimeMillis();
+                            // Prepare result and commit.
+                            IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newInfo,
parentId);
 
-                                        final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(),
0L,
-                                            affKey, createFileLockId(false), evictExclude,
fileProps, t, t);
+                            tx.commit();
 
-                                        assert newFileInfo.lockId() != null; // locked info
should be created.
+                            IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
-                                        createNewEntry(newFileInfo, parentId, name);
+                            return t2;
+                        }
+                        else {
+                            // Need to create directory structure first. This is our starting
point:
+                            int lastExistingIdx = pathIds.lastExistingIndex();
+                            IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
 
-                                        IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2
= new T2<>(newFileInfo, parentId);
+                            // Check if entry we are going to write to is directory.
+                            if (lastExistingInfo.isFile())
+                                throw new IgfsParentNotDirectoryException("Failed to open
file for write file " +
+                                    "(parent element is not a directory): " + path);
 
-                                        tx.commit();
+                            // If current info already contains entry with the same name
as it's child, then something
+                            // has changed concurrently. We must re-try because we cannot
get info of this unexpected
+                            // element due to possible deadlocks.
+                            int curIdx = lastExistingIdx + 1;
 
-                                        delWorker.signal();
+                            String curPart = pathIds.part(curIdx);
+                            IgniteUuid curId = pathIds.surrogateId(curIdx);
+                            IgniteUuid curParentId = lastExistingInfo.id();
 
-                                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
EVT_IGFS_FILE_OPENED_WRITE);
+                            if (lastExistingInfo.hasChild(curPart))
+                                continue;
 
-                                        return t2;
-                                    }
-                                    else {
-                                        throw new IgfsPathAlreadyExistsException("Failed
to create file (file " +
-                                            "already exists and overwrite flag is false):
" + path);
-                                    }
-                                }
-                            }
+                            // First step: add new entry to the last existing element.
+                            id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart,
+                                new IgfsListingEntry(curId, !pathIds.isLastIndex(curIdx))));
 
-                            // The full requested path does not exist.
+                            // Events support.
+                            IgfsPath lastCreatedPath = pathIds.lastExistingPath();
+
+                            Collection<IgfsPath> createdPaths = new ArrayList<>(pathIds.count()
- curIdx);
+
+                            // Second step: create middle directories.
+                            long createTime = System.currentTimeMillis();
 
-                            // Check only the lowermost directory in the existing directory
chain
-                            // because others are already checked in #verifyPathIntegrity()
above.
-                            if (!lowermostExistingInfo.isDirectory())
-                                throw new IgfsParentNotDirectoryException("Failed to create"
-                                    + " file (parent element is not a directory)");
+                            while (curIdx < pathIds.count() - 1) {
+                                String nextPart = pathIds.part(curIdx + 1);
+                                IgniteUuid nextId = pathIds.surrogateId(curIdx + 1);
 
-                            final String uppermostFileToBeCreatedName = b.components.get(b.existingIdCnt
- 1);
+                                // This flag will be true for all directories except the
last one.
+                                id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime,
dirProps,
+                                    nextPart, new IgfsListingEntry(nextId, !pathIds.isLastIndex(curIdx
+ 1))));
+
+                                // Save event.
+                                lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
+
+                                createdPaths.add(lastCreatedPath);
+
+                                // Advance things further.
+                                curIdx++;
 
-                            if (!lowermostExistingInfo.hasChild(uppermostFileToBeCreatedName))
{
-                                b.doBuild();
+                                curParentId = curId;
 
-                                assert b.leafInfo != null;
-                                assert b.leafParentId != null;
+                                curPart = nextPart;
+                                curId = nextId;
+                            }
 
-                                IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(b.leafInfo,
b.leafParentId);
+                            // Third step: create leaf file.
+                            IgfsFileInfo info = invokeAndGet(curId, new FileCreateProcessor(createTime,
fileProps,
+                                blockSize, affKey, createFileLockId(false), evictExclude));
 
-                                tx.commit();
+                            IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(info,
curParentId);
 
-                                b.sendEvents();
+                            tx.commit();
 
-                                return t2;
+                            // Generate events.
+                            if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
+                                for (IgfsPath createdPath : createdPaths)
+                                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath,
EVT_IGFS_DIR_CREATED);
                             }
 
-                            // Another thread concurrently created file or directory in the
path with
-                            // the name we need.
+                            IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED);
+                            IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
+
+                            return t2;
                         }
                     }
                     finally {
@@ -3630,7 +3722,8 @@ public class IgfsMetaManager extends IgfsManager {
                 finally {
                     busyLock.leaveBusy();
                 }
-            } else
+            }
+            else
                 throw new IllegalStateException("Failed to mkdir because Grid is stopping.
[path=" + path + ']');
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7830cd8/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 3e23447..dee9dda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -1449,19 +1449,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest
{
                         createCtr.incrementAndGet();
                     }
                     catch (IgniteException e) {
-                        Throwable[] chain = X.getThrowables(e);
-
-                        Throwable cause = chain[chain.length - 1];
-
-                        if (!e.getMessage().startsWith("Failed to overwrite file (file is
opened for writing)")
-                                && (cause == null
-                                    || !cause.getMessage().startsWith("Failed to overwrite
file (file is opened for writing)"))) {
-
-                            System.out.println("Failed due to IgniteException exception.
Cause:");
-                            cause.printStackTrace(System.out);
-
-                            err.compareAndSet(null, e);
-                        }
+                        // No-op.
                     }
                     catch (IOException e) {
                         err.compareAndSet(null, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7830cd8/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index c013cae..1dd665a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -46,11 +46,11 @@ public class IgfsEventsTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
 
-        suite.addTest(new TestSuite(ldr.loadClass(ShmemPrivate.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
 
@@ -66,7 +66,7 @@ public class IgfsEventsTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only");
 
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
 
@@ -76,7 +76,7 @@ public class IgfsEventsTestSuite extends TestSuite {
     /**
      * Shared memory IPC in PRIVATE mode.
      */
-    public static class ShmemPrivate extends IgfsEventsAbstractSelfTest {
+    public static class ShmemPrimary extends IgfsEventsAbstractSelfTest {
         /** {@inheritDoc} */
         @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException
{
             FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
@@ -95,7 +95,7 @@ public class IgfsEventsTestSuite extends TestSuite {
     /**
      * Loopback socket IPS in PRIVATE mode.
      */
-    public static class LoopbackPrivate extends IgfsEventsAbstractSelfTest {
+    public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest {
         /** {@inheritDoc} */
         @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException
{
             FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();


Mime
View raw message