Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 564211905C for ; Tue, 22 Mar 2016 16:49:38 +0000 (UTC) Received: (qmail 59687 invoked by uid 500); 22 Mar 2016 16:49:38 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 59344 invoked by uid 500); 22 Mar 2016 16:49:38 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 58815 invoked by uid 99); 22 Mar 2016 16:49:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Mar 2016 16:49:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6441DE38EE; Tue, 22 Mar 2016 16:49:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Tue, 22 Mar 2016 16:49:49 -0000 Message-Id: <835e49e94dc44693ada66127f18bb469@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/50] [abbrv] ignite git commit: IGNITE-2814: IGFS: File lock/unlock/reserve operations are no longer require put/replace on cache. Thin entry processors are used instead. IGNITE-2814: IGFS: File lock/unlock/reserve operations are no longer require put/replace on cache. Thin entry processors are used instead. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1d9e8b6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1d9e8b6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1d9e8b6 Branch: refs/heads/ignite-2004 Commit: b1d9e8b6c7553133c4c4ca3820f6f30d202b7ea2 Parents: d83fa11 Author: vozerov-gridgain Authored: Mon Mar 14 13:17:58 2016 +0300 Committer: vozerov-gridgain Committed: Mon Mar 14 13:17:58 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsMetaManager.java | 270 ++++++++++++++++--- .../processors/igfs/IgfsOutputStreamImpl.java | 76 +----- 2 files changed, 228 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 89ddd02..8bb9e92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -466,10 +466,11 @@ public class IgfsMetaManager extends IgfsManager { * Lock the file explicitly outside of transaction. * * @param fileId File ID to lock. + * @param delete If file is being locked for delete. * @return Locked file info or {@code null} if file cannot be locked or doesn't exist. * @throws IgniteCheckedException If the file with such id does not exist, or on another failure. */ - public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean isDeleteLock) throws IgniteCheckedException { + public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean delete) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { assert validTxState(false); @@ -487,13 +488,11 @@ public class IgfsMetaManager extends IgfsManager { if (oldInfo.lockId() != null) return null; // The file is already locked, we cannot lock it. - IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock); + IgniteUuid lockId = createFileLockId(delete); - boolean put = id2InfoPrj.replace(fileId, oldInfo, newInfo); + id2InfoPrj.invoke(fileId, new FileLockProcessor(lockId)); - assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; - - assert newInfo.id().equals(oldInfo.id()); // Same id. + IgfsFileInfo newInfo = id2InfoPrj.get(fileId); tx.commit(); @@ -515,30 +514,13 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Set lock on file info. + * Create file lock ID. * - * @param info File info. - * @return New file info with lock set, or null if the info passed in is already locked. - * @throws IgniteCheckedException In case lock is already set on that file. + * @param delete If lock ID is required for file deletion. + * @return Lock ID. */ - private @Nullable IgfsFileInfo lockInfo(IgfsFileInfo info, boolean isDeleteLock) { - assert info != null; - - if (info.lockId() != null) - return null; // Null return value indicates that the file is already locked. - - return new IgfsFileInfo(info, composeLockId(isDeleteLock), info.modificationTime()); - } - - /** - * Gets a new lock id. - * The returned Id #globalId() method will return the Id of the node which locked the file. - * - * @param isDeleteLock if this is special delete lock. - * @return The new lock id. - */ - private IgniteUuid composeLockId(boolean isDeleteLock) { - if (isDeleteLock) + private IgniteUuid createFileLockId(boolean delete) { + if (delete) return IgfsUtils.DELETE_LOCK_ID; return IgniteUuid.fromUuid(locNode.id()); @@ -584,12 +566,7 @@ public class IgfsMetaManager extends IgfsManager { "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); - IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime); - - boolean put = id2InfoPrj.put(fileId, newInfo); - - assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo - + ']'; + id2InfoPrj.invoke(fileId, new FileUnlockProcessor(modificationTime)); return null; } @@ -1680,6 +1657,57 @@ public class IgfsMetaManager extends IgfsManager { } /** + * Reserve space for file. + * + * @param path File path. + * @param fileId File ID. + * @param space Space. + * @param affRange Affinity range. + * @return New file info. + */ + public IgfsFileInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange) + throws IgniteCheckedException { + assert validTxState(false); + + if (busyLock.enterBusy()) { + try { + if (log.isDebugEnabled()) + log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']'); + + IgniteInternalTx tx = startTx(); + + try { + // Lock file ID for this transaction. + IgfsFileInfo oldInfo = info(fileId); + + if (oldInfo == null) + throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']'); + + id2InfoPrj.invoke(fileId, new FileReserveSpaceProcessor(space, affRange)); + + IgfsFileInfo newInfo = id2InfoPrj.get(fileId); + + tx.commit(); + + return newInfo; + } + catch (GridClosureException e) { + throw U.cast(e); + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path + + ", id=" + fileId + ']'); + } + + /** * Update file info in cache. * * @param fileId File ID to update information for. @@ -1992,7 +2020,7 @@ public class IgfsMetaManager extends IgfsManager { "the secondary file system because the path points to a directory: " + path); IgfsFileInfo newInfo = new IgfsFileInfo(status.blockSize(), status.length(), affKey, - composeLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(), + createFileLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(), status.accessTime(), status.modificationTime()); // Add new file info to the listing optionally removing the previous one. @@ -2113,13 +2141,11 @@ public class IgfsMetaManager extends IgfsManager { } // Set lock and return. - IgfsFileInfo lockedInfo = lockInfo(info, false); - - assert lockedInfo != null; // We checked the lock above. + IgniteUuid lockId = createFileLockId(false); - boolean put = id2InfoPrj.put(info.id(), lockedInfo); + id2InfoPrj.invoke(info.id(), new FileLockProcessor(lockId)); - assert put; + IgfsFileInfo lockedInfo = id2InfoPrj.get(info.id()); return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(), lockedInfo, out); @@ -3455,7 +3481,7 @@ public class IgfsMetaManager extends IgfsManager { @Override protected IgfsFileInfo buildLeaf() { long t = System.currentTimeMillis(); - return new IgfsFileInfo(blockSize, 0L, affKey, composeLockId(false), + return new IgfsFileInfo(blockSize, 0L, affKey, createFileLockId(false), evictExclude, leafProps, t, t); } }; @@ -3505,7 +3531,7 @@ public class IgfsMetaManager extends IgfsManager { + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id() + ", lockId=" + lockId + ']'); - IgniteUuid newLockId = composeLockId(false); + IgniteUuid newLockId = createFileLockId(false); EntryProcessorResult result = id2InfoPrj.invoke(lowermostExistingInfo.id(), @@ -3553,7 +3579,7 @@ public class IgfsMetaManager extends IgfsManager { long t = System.currentTimeMillis(); final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L, - affKey, composeLockId(false), evictExclude, fileProps, t, t); + affKey, createFileLockId(false), evictExclude, fileProps, t, t); assert newFileInfo.lockId() != null; // locked info should be created. @@ -3874,4 +3900,162 @@ public class IgfsMetaManager extends IgfsManager { return S.toString(LockFileProcessor.class, this); } } + + /** + * File lock entry processor. + */ + private static class FileLockProcessor implements EntryProcessor, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Lock Id. */ + private IgniteUuid lockId; + + /** + * Default constructor. + */ + public FileLockProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param lockId Lock ID. + */ + public FileLockProcessor(IgniteUuid lockId) { + this.lockId = lockId; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry entry, Object... args) + throws EntryProcessorException { + IgfsFileInfo old = entry.getValue(); + + entry.setValue(new IgfsFileInfo(old, lockId, old.modificationTime())); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, lockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + lockId = U.readGridUuid(in); + } + } + + /** + * File unlock entry processor. + */ + private static class FileUnlockProcessor implements EntryProcessor, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Modification time. */ + private long modificationTime; + + /** + * Default constructor. + */ + public FileUnlockProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param modificationTime Modification time. + */ + public FileUnlockProcessor(long modificationTime) { + this.modificationTime = modificationTime; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry entry, Object... args) + throws EntryProcessorException { + IgfsFileInfo old = entry.getValue(); + + entry.setValue(new IgfsFileInfo(old, null, modificationTime)); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(modificationTime); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + modificationTime = in.readLong(); + } + } + + /** + * File reserve space entry processor. + */ + private static class FileReserveSpaceProcessor implements EntryProcessor, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Space. */ + private long space; + + /** Affinity range. */ + private IgfsFileAffinityRange affRange; + + /** + * Default constructor. + */ + public FileReserveSpaceProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param space Space. + * @param affRange + */ + public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) { + this.space = space; + this.affRange = affRange; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry entry, Object... args) + throws EntryProcessorException { + IgfsFileInfo oldInfo = entry.getValue(); + + IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); + + newMap.addRange(affRange); + + IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length() + space); + + newInfo.fileMap(newMap); + + entry.setValue(newInfo); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(space); + out.writeObject(affRange); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + space = in.readLong(); + affRange = (IgfsFileAffinityRange)in.readObject(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index 83056af..8c11073 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -18,10 +18,7 @@ package org.apache.ignite.internal.processors.igfs; import java.io.DataInput; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -30,11 +27,9 @@ import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -297,8 +292,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { if (space > 0) { data.awaitAllAcksReceived(fileInfo.id()); - IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(), - new ReserveSpaceClosure(space, streamRange)); + IgfsFileInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange); if (fileInfo0 == null) throw new IOException("File was concurrently deleted: " + path); @@ -446,72 +440,4 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter { @Override public String toString() { return S.toString(IgfsOutputStreamImpl.class, this); } - - /** - * Helper closure to reserve specified space and update file's length - */ - @GridInternal - private static final class ReserveSpaceClosure implements IgniteClosure, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Space amount (bytes number) to increase file's length. */ - private long space; - - /** Affinity range for this particular update. */ - private IgfsFileAffinityRange range; - - /** - * Empty constructor required for {@link Externalizable}. - * - */ - public ReserveSpaceClosure() { - // No-op. - } - - /** - * Constructs the closure to reserve specified space and update file's length. - * - * @param space Space amount (bytes number) to increase file's length. - * @param range Affinity range specifying which part of file was colocated. - */ - private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) { - this.space = space; - this.range = range; - } - - /** {@inheritDoc} */ - @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) { - IgfsFileMap oldMap = oldInfo.fileMap(); - - IgfsFileMap newMap = new IgfsFileMap(oldMap); - - newMap.addRange(range); - - // Update file length. - IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space); - - updated.fileMap(newMap); - - return updated; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(space); - out.writeObject(range); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - space = in.readLong(); - range = (IgfsFileAffinityRange)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReserveSpaceClosure.class, this); - } - } } \ No newline at end of file