Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C1C6200B34 for ; Fri, 17 Jun 2016 14:38:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6AEAA160A6A; Fri, 17 Jun 2016 12:38:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C4F38160A67 for ; Fri, 17 Jun 2016 14:38:31 +0200 (CEST) Received: (qmail 56759 invoked by uid 500); 17 Jun 2016 12:38:31 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 56458 invoked by uid 99); 17 Jun 2016 12:38:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2016 12:38:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85AABEC22B; Fri, 17 Jun 2016 12:38:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 17 Jun 2016 12:38:38 -0000 Message-Id: <91185491f3e94efcb22c7c6e6e85b402@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/43] ignite git commit: IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics. archived-at: Fri, 17 Jun 2016 12:38:33 -0000 IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a0990c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a0990c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a0990c Branch: refs/heads/ignite-3335 Commit: 98a0990c90fbfe5737f4f0f2d1c34a84fd0a6dde Parents: 54425bf Author: vozerov-gridgain Authored: Tue Jun 14 17:39:44 2016 +0300 Committer: vozerov-gridgain Committed: Tue Jun 14 17:39:44 2016 +0300 ---------------------------------------------------------------------- .../configuration/FileSystemConfiguration.java | 18 +- .../processors/igfs/IgfsDataManager.java | 166 ++++--------------- .../igfs/data/IgfsDataPutProcessor.java | 99 +++++++++++ 3 files changed, 147 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java index 074636a..88b0c28 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java @@ -704,7 +704,9 @@ public class FileSystemConfiguration { * Gets maximum timeout awaiting for trash purging in case data cache oversize is detected. * * @return Maximum timeout awaiting for trash purging in case data cache oversize is detected. + * @deprecated Not used any more. */ + @Deprecated public long getTrashPurgeTimeout() { return trashPurgeTimeout; } @@ -713,7 +715,9 @@ public class FileSystemConfiguration { * Sets maximum timeout awaiting for trash purging in case data cache oversize is detected. * * @param trashPurgeTimeout Maximum timeout awaiting for trash purging in case data cache oversize is detected. + * @deprecated Not used any more. */ + @Deprecated public void setTrashPurgeTimeout(long trashPurgeTimeout) { this.trashPurgeTimeout = trashPurgeTimeout; } @@ -724,8 +728,10 @@ public class FileSystemConfiguration { * In case no executor service is provided, default one will be created with maximum amount of threads equals * to amount of processor cores. * - * @return Get DUAL mode put operation executor service + * @return Get DUAL mode put operation executor service. + * @deprecated Not used any more. */ + @Deprecated @Nullable public ExecutorService getDualModePutExecutorService() { return dualModePutExec; } @@ -734,7 +740,9 @@ public class FileSystemConfiguration { * Set DUAL mode put operations executor service. * * @param dualModePutExec Dual mode put operations executor service. + * @deprecated Not used any more. */ + @Deprecated public void setDualModePutExecutorService(ExecutorService dualModePutExec) { this.dualModePutExec = dualModePutExec; } @@ -743,7 +751,9 @@ public class FileSystemConfiguration { * Get DUAL mode put operation executor service shutdown flag. * * @return DUAL mode put operation executor service shutdown flag. + * @deprecated Not used any more. */ + @Deprecated public boolean getDualModePutExecutorServiceShutdown() { return dualModePutExecShutdown; } @@ -752,7 +762,9 @@ public class FileSystemConfiguration { * Set DUAL mode put operations executor service shutdown flag. * * @param dualModePutExecShutdown Dual mode put operations executor service shutdown flag. + * @deprecated Not used any more. */ + @Deprecated public void setDualModePutExecutorServiceShutdown(boolean dualModePutExecShutdown) { this.dualModePutExecShutdown = dualModePutExecShutdown; } @@ -766,7 +778,9 @@ public class FileSystemConfiguration { * avoid issues with increasing GC pauses or out-of-memory error. * * @return Maximum amount of pending data read from the secondary file system + * @deprecated Not used any more. */ + @Deprecated public long getDualModeMaxPendingPutsSize() { return dualModeMaxPendingPutsSize; } @@ -775,7 +789,9 @@ public class FileSystemConfiguration { * Set maximum amount of data in pending put operations. * * @param dualModeMaxPendingPutsSize Maximum amount of data in pending put operations. + * @deprecated Not used any more. */ + @Deprecated public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) { this.dualModeMaxPendingPutsSize = dualModeMaxPendingPutsSize; } http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 34d77f9..cb2b630 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters; +import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -53,7 +54,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -78,10 +78,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -98,9 +96,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * Cache based file's data container. */ public class IgfsDataManager extends IgfsManager { - /** IGFS. */ - private IgfsEx igfs; - /** Data internal cache. */ private IgniteInternalCache dataCachePrj; @@ -143,31 +138,10 @@ public class IgfsDataManager extends IgfsManager { /** Async file delete worker. */ private AsyncDeleteWorker delWorker; - /** Trash purge timeout. */ - private long trashPurgeTimeout; - /** On-going remote reads futures. */ private final ConcurrentHashMap8> rmtReadFuts = new ConcurrentHashMap8<>(); - /** Executor service for puts in dual mode */ - private volatile ExecutorService putExecSvc; - - /** Executor service for puts in dual mode shutdown flag. */ - private volatile boolean putExecSvcShutdown; - - /** Maximum amount of data in pending puts. */ - private volatile long maxPendingPuts; - - /** Current amount of data in pending puts. */ - private long curPendingPuts; - - /** Lock for pending puts. */ - private final Lock pendingPutsLock = new ReentrantLock(); - - /** Condition for pending puts. */ - private final Condition pendingPutsCond = pendingPutsLock.newCondition(); - /** * */ @@ -182,8 +156,6 @@ public class IgfsDataManager extends IgfsManager { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - igfs = igfsCtx.igfs(); - dataCacheStartLatch = new CountDownLatch(1); String igfsName = igfsCtx.configuration().getName(); @@ -216,23 +188,6 @@ public class IgfsDataManager extends IgfsManager { igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService(); - trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout(); - - putExecSvc = igfsCtx.configuration().getDualModePutExecutorService(); - - if (putExecSvc != null) - putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown(); - else { - int coresCnt = Runtime.getRuntime().availableProcessors(); - - // Note that we do not pre-start threads here as IGFS pool may not be needed. - putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque()); - - putExecSvcShutdown = true; - } - - maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize(); - delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(), "igfs-" + igfsName + "-delete-worker", log); } @@ -282,9 +237,6 @@ public class IgfsDataManager extends IgfsManager { catch (IgniteInterruptedCheckedException e) { log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e); } - - if (putExecSvcShutdown) - U.shutdownNow(getClass(), putExecSvc, log); } /** @@ -308,6 +260,7 @@ public class IgfsDataManager extends IgfsManager { * @param prevAffKey Affinity key of previous block. * @return Affinity key. */ + @SuppressWarnings("ConstantConditions") public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) { // Do not generate affinity key for non-affinity nodes. if (!dataCache.context().affinityNode()) @@ -371,8 +324,6 @@ public class IgfsDataManager extends IgfsManager { @Nullable public IgniteInternalFuture dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path, final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader) throws IgniteCheckedException { - //assert validTxState(any); // Allow this method call for any transaction state. - assert fileInfo != null; assert blockIdx >= 0; @@ -435,7 +386,7 @@ public class IgfsDataManager extends IgfsManager { rmtReadFut.onDone(res); - putSafe(key, res); + putBlock(fileInfo.blockSize(), key, res); metrics.addReadBlocks(1, 1); } @@ -471,6 +422,26 @@ public class IgfsDataManager extends IgfsManager { } /** + * Stores the given block in data cache. + * + * @param blockSize The size of the block. + * @param key The data cache key of the block. + * @param data The new value of the block. + */ + private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException { + if (data.length < blockSize) + // partial (incomplete) block: + dataCachePrj.invoke(key, new IgfsDataPutProcessor(data)); + else { + // whole block: + assert data.length == blockSize; + + dataCachePrj.put(key, data); + } + } + + + /** * Registers write future in igfs data manager. * * @param fileId File ID. @@ -680,7 +651,7 @@ public class IgfsDataManager extends IgfsManager { byte[] val = vals.get(colocatedKey); if (val != null) { - dataCachePrj.put(key, val); + putBlock(fileInfo.blockSize(), key, val); tx.commit(); } @@ -744,7 +715,6 @@ public class IgfsDataManager extends IgfsManager { */ public Collection affinity(IgfsEntryInfo info, long start, long len, long maxLen) throws IgniteCheckedException { - assert validTxState(false); assert info.isFile() : "Failed to get affinity (not a file): " + info; assert start >= 0 : "Start position should not be negative: " + start; assert len >= 0 : "Part length should not be negative: " + len; @@ -974,21 +944,6 @@ public class IgfsDataManager extends IgfsManager { } /** - * Check transaction is (not) started. - * - * @param inTx Expected transaction state. - * @return Transaction state is correct. - */ - private boolean validTxState(boolean inTx) { - boolean txState = inTx == (dataCachePrj.tx() != null); - - assert txState : (inTx ? "Method cannot be called outside transaction: " : - "Method cannot be called in transaction: ") + dataCachePrj.tx(); - - return txState; - } - - /** * @param fileId File ID. * @param node Node to process blocks on. * @param blocks Blocks to put in cache. @@ -1056,10 +1011,11 @@ public class IgfsDataManager extends IgfsManager { * @param colocatedKey Block key. * @param startOff Data start offset within block. * @param data Data to write. + * @param blockSize The block size. * @throws IgniteCheckedException If update failed. */ private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff, - byte[] data) throws IgniteCheckedException { + byte[] data, int blockSize) throws IgniteCheckedException { if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) { final WriteCompletionFuture completionFut = pendingWrites.get(fileId); @@ -1090,7 +1046,7 @@ public class IgfsDataManager extends IgfsManager { // If writing from block beginning, just put and return. if (startOff == 0) { - dataCachePrj.put(colocatedKey, data); + putBlock(blockSize, colocatedKey, data); return; } @@ -1151,67 +1107,6 @@ public class IgfsDataManager extends IgfsManager { } /** - * Put data block read from the secondary file system to the cache. - * - * @param key Key. - * @param data Data. - * @throws IgniteCheckedException If failed. - */ - private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException { - assert key != null; - assert data != null; - - if (maxPendingPuts > 0) { - pendingPutsLock.lock(); - - try { - while (curPendingPuts > maxPendingPuts) - pendingPutsCond.await(2000, TimeUnit.MILLISECONDS); - - curPendingPuts += data.length; - } - catch (InterruptedException ignore) { - throw new IgniteCheckedException("Failed to put IGFS data block into cache due to interruption: " + key); - } - finally { - pendingPutsLock.unlock(); - } - } - - Runnable task = new Runnable() { - @Override public void run() { - try { - dataCachePrj.put(key, data); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to put IGFS data block into cache [key=" + key + ", err=" + e + ']'); - } - finally { - if (maxPendingPuts > 0) { - pendingPutsLock.lock(); - - try { - curPendingPuts -= data.length; - - pendingPutsCond.signalAll(); - } - finally { - pendingPutsLock.unlock(); - } - } - } - } - }; - - try { - putExecSvc.submit(task); - } - catch (RejectedExecutionException ignore) { - task.run(); - } - } - - /** * @param blocks Blocks to write. * @return Future that will be completed after put is done. */ @@ -1261,6 +1156,7 @@ public class IgfsDataManager extends IgfsManager { * @param nodeId Node ID. * @param ackMsg Write acknowledgement message. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) { try { ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null); @@ -1343,6 +1239,7 @@ public class IgfsDataManager extends IgfsManager { * @throws IgniteCheckedException If failed. * @return Data remainder if {@code flush} flag is {@code false}. */ + @SuppressWarnings("ConstantConditions") @Nullable public byte[] storeDataBlocks( IgfsEntryInfo fileInfo, long reservedLen, @@ -1456,7 +1353,7 @@ public class IgfsDataManager extends IgfsManager { if (size != blockSize) { // Partial writes must be always synchronous. - processPartialBlockWrite(id, key, block == first ? off : 0, portion); + processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize); writtenTotal++; } @@ -1617,8 +1514,6 @@ public class IgfsDataManager extends IgfsManager { protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) { super(gridName, name, log); - long time = System.currentTimeMillis(); - stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid()); } @@ -1642,6 +1537,7 @@ public class IgfsDataManager extends IgfsManager { } /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { try { while (!isCancelled()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java new file mode 100644 index 0000000..2029d4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.data; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Entry processor to set or replace block byte value. + */ +public class IgfsDataPutProcessor implements EntryProcessor, Externalizable, Binarylizable { + /** */ + private static final long serialVersionUID = 0L; + + /** The new value. */ + private byte[] newVal; + + /** + * Non-arg constructor required by externalizable. + */ + public IgfsDataPutProcessor() { + // no-op + } + + /** + * Constructor. + * + * @param newVal The new value. + */ + public IgfsDataPutProcessor(byte[] newVal) { + assert newVal != null; + + this.newVal = newVal; + } + + /** {@inheritDoc} */ + public Void process(MutableEntry entry, Object... args) + throws EntryProcessorException { + byte[] curVal = entry.getValue(); + + if (curVal == null || newVal.length > curVal.length) + entry.setValue(newVal); + + return null; + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + newVal = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, newVal); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + newVal = reader.rawReader().readByteArray(); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writer.rawWriter().writeByteArray(newVal); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsDataPutProcessor.class, this); + } +}