Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E308200B80 for ; Wed, 14 Sep 2016 13:10:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3CE61160AD4; Wed, 14 Sep 2016 11:10:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 79349160AF9 for ; Wed, 14 Sep 2016 13:10:20 +0200 (CEST) Received: (qmail 35262 invoked by uid 500); 14 Sep 2016 11:10:19 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 34111 invoked by uid 99); 14 Sep 2016 11:10:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Sep 2016 11:10:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75CD9F0BF1; Wed, 14 Sep 2016 11:10:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 14 Sep 2016 11:11:05 -0000 Message-Id: In-Reply-To: <405a5f4a1979449aa9ebf08fe75e1c58@git.apache.org> References: <405a5f4a1979449aa9ebf08fe75e1c58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/50] ignite git commit: IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy (2). archived-at: Wed, 14 Sep 2016 11:10:24 -0000 IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy (2). Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/409f043b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/409f043b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/409f043b Branch: refs/heads/ignite-3661 Commit: 409f043b3e94f51aa23341b7283233a572be6cd2 Parents: 16c5a71 Author: vozerov-gridgain Authored: Wed Sep 14 11:01:33 2016 +0300 Committer: vozerov-gridgain Committed: Wed Sep 14 11:01:33 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsAsyncImpl.java | 5 - .../internal/processors/igfs/IgfsContext.java | 27 ++++ .../processors/igfs/IgfsDataManager.java | 19 +-- .../ignite/internal/processors/igfs/IgfsEx.java | 8 -- .../internal/processors/igfs/IgfsImpl.java | 143 ++----------------- .../processors/igfs/IgfsInputStreamImpl.java | 103 ++++++------- .../processors/igfs/IgfsOutputStreamImpl.java | 8 +- .../internal/processors/igfs/IgfsMock.java | 7 - 8 files changed, 100 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 743601e..106ef60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -157,11 +157,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme } /** {@inheritDoc} */ - @Override public IgfsLocalMetrics localMetrics() { - return igfs.localMetrics(); - } - - /** {@inheritDoc} */ @Override public long groupBlockSize() { return igfs.groupBlockSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java index a638bf3..3e01246 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java @@ -54,6 +54,12 @@ public class IgfsContext { /** IGFS instance. */ private final IgfsEx igfs; + /** Local metrics holder. */ + private final IgfsLocalMetrics metrics = new IgfsLocalMetrics(); + + /** Local cluster node. */ + private volatile ClusterNode locNode; + /** * @param ctx Kernal context. * @param cfg IGFS configuration. @@ -179,6 +185,27 @@ public class IgfsContext { } /** + * Get local metrics. + * + * @return Local metrics. + */ + public IgfsLocalMetrics metrics() { + return metrics; + } + + /** + * Get local node. + * + * @return Local node. + */ + public ClusterNode localNode() { + if (locNode == null) + locNode = ctx.discovery().localNode(); + + return locNode; + } + + /** * Adds manager to managers list. * * @param mgr Manager. http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 5e2c6b2..d2183f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -105,9 +105,6 @@ public class IgfsDataManager extends IgfsManager { /** */ private CountDownLatch dataCacheStartLatch; - /** Local IGFS metrics. */ - private IgfsLocalMetrics metrics; - /** Group block size. */ private long grpBlockSize; @@ -201,8 +198,6 @@ public class IgfsDataManager extends IgfsManager { dataCache = (IgniteInternalCache)dataCachePrj; - metrics = igfsCtx.igfs().localMetrics(); - AffinityKeyMapper mapper = igfsCtx.kernalContext().cache() .internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper(); @@ -388,7 +383,7 @@ public class IgfsDataManager extends IgfsManager { putBlock(fileInfo.blockSize(), key, res); - metrics.addReadBlocks(1, 1); + igfsCtx.metrics().addReadBlocks(1, 1); } catch (IgniteCheckedException e) { rmtReadFut.onDone(e); @@ -405,18 +400,18 @@ public class IgfsDataManager extends IgfsManager { // Wait for existing future to finish and get it's result. res = oldRmtReadFut.get(); - metrics.addReadBlocks(1, 0); + igfsCtx.metrics().addReadBlocks(1, 0); } } else - metrics.addReadBlocks(1, 0); + igfsCtx.metrics().addReadBlocks(1, 0); return res; } }); } else - metrics.addReadBlocks(1, 0); + igfsCtx.metrics().addReadBlocks(1, 0); return fut; } @@ -1308,7 +1303,7 @@ public class IgfsDataManager extends IgfsManager { if (!nodeBlocks.isEmpty()) { processBatch(id, node, nodeBlocks); - metrics.addWriteBlocks(1, 0); + igfsCtx.metrics().addWriteBlocks(1, 0); } return portion; @@ -1350,7 +1345,7 @@ public class IgfsDataManager extends IgfsManager { else nodeBlocks.put(key, portion); - metrics.addWriteBlocks(writtenTotal, writtenSecondary); + igfsCtx.metrics().addWriteBlocks(writtenTotal, writtenSecondary); written += portion.length; } @@ -1359,7 +1354,7 @@ public class IgfsDataManager extends IgfsManager { if (!nodeBlocks.isEmpty()) { processBatch(id, node, nodeBlocks); - metrics.addWriteBlocks(nodeBlocks.size(), 0); + igfsCtx.metrics().addWriteBlocks(nodeBlocks.size(), 0); } assert written == len; http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 05e157d..c869695 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs; import java.net.URI; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; @@ -75,13 +74,6 @@ public interface IgfsEx extends IgniteFileSystem { @Nullable public Boolean globalSampling(); /** - * Get local metrics. - * - * @return Local metrics. - */ - public IgfsLocalMetrics localMetrics(); - - /** * Gets group block size, i.e. block size multiplied by group size in affinity mapper. * * @return Group block size. http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 2c1f0f3..45596a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.eviction.EvictionPolicy; import org.apache.ignite.cache.eviction.igfs.IgfsPerBlockLruEvictionPolicy; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; @@ -48,7 +47,6 @@ import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.igfs.mapreduce.IgfsTask; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; @@ -72,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; @@ -83,7 +80,6 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import java.io.IOException; import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; @@ -97,12 +93,10 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED; @@ -142,9 +136,6 @@ public final class IgfsImpl implements IgfsEx { /** Event storage manager. */ private GridEventStorageManager evts; - /** Local node. */ - private ClusterNode locNode; - /** Logger. */ private IgniteLogger log; @@ -285,16 +276,6 @@ public final class IgfsImpl implements IgfsEx { new LinkedBlockingQueue(), new IgfsThreadFactory(cfg.getName()), null) : null; } - /** - * @return Local node. - */ - private ClusterNode localNode() { - if (locNode == null) - locNode = igfsCtx.kernalContext().discovery().localNode(); - - return locNode; - } - /** {@inheritDoc} */ @Override public void stop(boolean cancel) { busyLock.block(); @@ -500,12 +481,6 @@ public final class IgfsImpl implements IgfsEx { } }); } - - /** {@inheritDoc} */ - @Override public IgfsLocalMetrics localMetrics() { - return metrics; - } - /** {@inheritDoc} */ @Override public long groupBlockSize() { return data.groupBlockSize(); @@ -632,7 +607,7 @@ public final class IgfsImpl implements IgfsEx { if (info != null) { if (evts.isRecordable(EVT_IGFS_META_UPDATED)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_META_UPDATED, props)); + evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_META_UPDATED, props)); return new IgfsFileImpl(path, info, data.groupBlockSize()); } @@ -979,8 +954,8 @@ public final class IgfsImpl implements IgfsEx { IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0); - IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, desc.info(), - cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics); + IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(), + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader()); IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); @@ -996,8 +971,8 @@ public final class IgfsImpl implements IgfsEx { throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path); // Input stream to read data from grid cache with separate blocks. - IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, info, - cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics); + IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info, + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null); IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ); @@ -1266,6 +1241,8 @@ public final class IgfsImpl implements IgfsEx { } } + IgfsLocalMetrics metrics = igfsCtx.metrics(); + return new IgfsMetricsAdapter( igfsCtx.data().spaceSize(), igfsCtx.data().maxSpaceSize(), @@ -1288,7 +1265,7 @@ public final class IgfsImpl implements IgfsEx { /** {@inheritDoc} */ @Override public void resetMetrics() { - metrics.reset(); + igfsCtx.metrics().reset(); } /** {@inheritDoc} */ @@ -1592,110 +1569,6 @@ public final class IgfsImpl implements IgfsEx { throw new IllegalStateException("Asynchronous mode is not enabled."); } - /** Detailed file descriptor. */ - private static final class FileDescriptor { - /** Parent file ID. */ - @Nullable - private final IgniteUuid parentId; - - /** File name. */ - private final String fileName; - - /** File ID. */ - private final IgniteUuid fileId; - - /** File is plain data file or directory. */ - private final boolean isFile; - - /** - * Constructs detailed file descriptor. - * - * @param parentId Parent file ID. - * @param fileName File name. - * @param fileId File ID. - * @param isFile {@code True} if file. - */ - private FileDescriptor(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, boolean isFile) { - assert fileName != null; - - this.parentId = parentId; - this.fileName = fileName; - - this.fileId = fileId; - this.isFile = isFile; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = parentId != null ? parentId.hashCode() : 0; - - res = 31 * res + fileName.hashCode(); - res = 31 * res + fileId.hashCode(); - res = 31 * res + (isFile ? 1231 : 1237); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - FileDescriptor that = (FileDescriptor)o; - - return fileId.equals(that.fileId) && isFile == that.isFile && fileName.equals(that.fileName) && - (parentId == null ? that.parentId == null : parentId.equals(that.parentId)); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(FileDescriptor.class, this); - } - } - - /** - * IGFS input stream extension that fires events. - */ - private class IgfsEventAwareInputStream extends IgfsInputStreamImpl { - /** Close guard. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(false); - - /** - * Constructor. - * - * @param igfsCtx IGFS context. - * @param path Path to stored file. - * @param fileInfo File info. - * @param prefetchBlocks Prefetch blocks. - * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. - * @param secReader Optional secondary file system reader. - * @param metrics Metrics. - */ - IgfsEventAwareInputStream(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, - int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, - IgfsLocalMetrics metrics) { - super(igfsCtx, path, fileInfo, prefetchBlocks, seqReadsBeforePrefetch, secReader, metrics); - - metrics.incrementFilesOpenedForRead(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") - @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - super.close(); - - metrics.decrementFilesOpenedForRead(); - - if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ)) - evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes())); - } - } - } - /** * Space calculation task. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java index f20a423..2f9f2fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsCorruptedFileException; import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -43,6 +45,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; + /** * Input stream to read data from grid cache with separate blocks. */ @@ -50,11 +54,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** Empty chunks result. */ private static final byte[][] EMPTY_CHUNKS = new byte[0][]; - /** Meta manager. */ - private final IgfsMetaManager meta; - - /** Data manager. */ - private final IgfsDataManager data; + /** IGFS context. */ + private final IgfsContext igfsCtx; /** Secondary file system reader. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -108,9 +109,6 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** Time consumed on reading. */ private long time; - /** Local IGFS metrics. */ - private final IgfsLocalMetrics metrics; - /** * Constructs file output stream. * @@ -120,24 +118,19 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar * @param prefetchBlocks Number of blocks to prefetch. * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. * @param secReader Optional secondary file system reader. - * @param metrics Local IGFS metrics. */ IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks, - int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, IgfsLocalMetrics metrics) { + int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) { assert igfsCtx != null; assert path != null; assert fileInfo != null; - assert metrics != null; + this.igfsCtx = igfsCtx; this.path = path; this.fileInfo = fileInfo; this.prefetchBlocks = prefetchBlocks; this.seqReadsBeforePrefetch = seqReadsBeforePrefetch; this.secReader = secReader; - this.metrics = metrics; - - meta = igfsCtx.meta(); - data = igfsCtx.data(); log = igfsCtx.kernalContext().log(IgfsInputStream.class); @@ -146,6 +139,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f); pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1); + + igfsCtx.metrics().incrementFilesOpenedForRead(); } /** @@ -295,46 +290,56 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar /** {@inheritDoc} */ @Override public synchronized void close() throws IOException { - try { - if (secReader != null) { - // Close secondary input stream. - secReader.close(); - - // Ensuring local cache futures completion. - for (IgniteInternalFuture fut : locCache.values()) { - try { - fut.get(); - } - catch (IgniteCheckedException ignore) { - // No-op. + if (!closed) { + try { + if (secReader != null) { + // Close secondary input stream. + secReader.close(); + + // Ensuring local cache futures completion. + for (IgniteInternalFuture fut : locCache.values()) { + try { + fut.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } } - } - // Ensuring pending evicted futures completion. - while (!pendingFuts.isEmpty()) { - pendingFutsLock.lock(); + // Ensuring pending evicted futures completion. + while (!pendingFuts.isEmpty()) { + pendingFutsLock.lock(); - try { - pendingFutsCond.await(100, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ignore) { - // No-op. - } - finally { - pendingFutsLock.unlock(); + try { + pendingFutsCond.await(100, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignore) { + // No-op. + } + finally { + pendingFutsLock.unlock(); + } } } } - } - catch (Exception e) { - throw new IOException("File to close the file: " + path, e); - } - finally { - closed = true; + catch (Exception e) { + throw new IOException("File to close the file: " + path, e); + } + finally { + closed = true; + + IgfsLocalMetrics metrics = igfsCtx.metrics(); + + metrics.addReadBytesTime(bytes, time); + metrics.decrementFilesOpenedForRead(); - metrics.addReadBytesTime(bytes, time); + locCache.clear(); - locCache.clear(); + GridEventStorageManager evts = igfsCtx.kernalContext().event(); + + if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ)) + evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes())); + } } } @@ -408,7 +413,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar // This failure may be caused by file being fragmented. if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { - IgfsEntryInfo newInfo = meta.info(fileInfo.id()); + IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id()); // File was deleted. if (newInfo == null) @@ -540,7 +545,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar */ @Nullable protected IgniteInternalFuture dataBlock(IgfsEntryInfo fileInfo, long blockIdx) throws IgniteCheckedException { - return data.dataBlock(fileInfo, path, blockIdx, secReader); + return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index bbff93b..6dec0c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -127,7 +127,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { writeFut = igfsCtx.data().writeStart(fileInfo.id()); } - igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite(); + igfsCtx.metrics().incrementFilesOpenedForWrite(); } /** {@inheritDoc} */ @@ -355,8 +355,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { if (err != null) throw err; - igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time); - igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite(); + igfsCtx.metrics().addWrittenBytesTime(bytes, time); + igfsCtx.metrics().decrementFilesOpenedForWrite(); GridEventStorageManager evts = igfsCtx.kernalContext().event(); @@ -396,7 +396,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { /** * Send local buffer if at least something is stored there. * - * @throws IOException + * @throws IOException If failed. */ private void sendBufferIfNotEmpty() throws IOException { if (buf != null && buf.position() > 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java index 2b989c5..04c67dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java @@ -117,13 +117,6 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ - @Override public IgfsLocalMetrics localMetrics() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ @Override public long groupBlockSize() { throwUnsupported();