ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/20] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 14:22:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c31cec7c/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpi.java
new file mode 100644
index 0000000..7bb91d2
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -0,0 +1,1843 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.spi.swapspace.file;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.spi.swapspace.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * File-based swap space SPI implementation which holds keys in memory. This SPI is used by default.
+ * It is intended for use in cases when value size is bigger than {@code 100} bytes, otherwise it will not
+ * have any positive effect.
+ * <p>
+ * <b>NOTE: This SPI does not support swap eviction currently, manual removes needed to reduce disk space
+ * consumption.</b>
+ * <p>
+ * Every space has a name and when used in combination with in-memory data grid name and local node ID,
+ * space name represents the actual cache name associated with this swap space. Default name is {@code null}
+ * which is represented by {@link #DFLT_SPACE_NAME}.
+ *
+ * <h1 class="header">Configuration</h1>
+ * <h2 class="header">Mandatory</h2>
+ * This SPI has no mandatory configuration parameters.
+ * <h2 class="header">Optional SPI configuration.</h2>
+ * <ul>
+ *     <li>Base directory path (see {@link #setBaseDirectory(String)}).</li>
+ *     <li>Maximum sparsity (see {@link #setMaximumSparsity(float)}).</li>
+ *     <li>Write buffer size in bytes (see {@link #setWriteBufferSize(int)}).</li>
+ *     <li>Max write queue size in bytes (see {@link #setMaxWriteQueueSize(int)}).</li>
+ *     <li>Read stripes number. (see {@link #setReadStripesNumber(int)}).</li>
+ * </ul>
+ *
+ * <h2 class="header">Java Example</h2>
+ * GridFileSwapSpaceSpi is configured by default and should be explicitly configured
+ * only if some SPI configuration parameters need to be overridden.
+ * <pre name="code" class="java">
+ * GridFileSwapSpaceSpi spi = new GridFileSwapSpaceSpi();
+ *
+ * // Configure root folder path.
+ * spi.setBaseDirectory("/path/to/swap/folder");
+ *
+ * GridConfiguration cfg = new GridConfiguration();
+ *
+ * // Override default swap space SPI.
+ * cfg.setSwapSpaceSpi(spi);
+ *
+ * // Starts grid.
+ * G.start(cfg);
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * GridFileSwapSpaceSpi can be configured from Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * &lt;bean id=&quot;grid.cfg&quot; class=&quot;org.gridgain.grid.GridConfiguration&quot; scope=&quot;singleton&quot;&gt;
+ *     ...
+ *     &lt;property name=&quot;swapSpaceSpi&quot;&gt;
+ *         &lt;bean class=&quot;org.gridgain.grid.spi.swapspace.file.GridFileSwapSpaceSpi&quot;&gt;
+ *             &lt;property name=&quot;baseDirectory&quot; value=&quot;/path/to/swap/folder&quot;/&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/property&gt;
+ *     ...
+ * &lt;/bean&gt;
+ * </pre>
+ * <p>
+ * <img src="http://www.gridgain.com/images/spring-small.png">
+ * <br>
+ * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
+ * @see org.gridgain.grid.spi.swapspace.SwapSpaceSpi
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@SuppressWarnings({"PackageVisibleInnerClass", "PackageVisibleField"})
+public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, FileSwapSpaceSpiMBean {
+    /**
+     * Default base directory. Note that this path is relative to {@code GRIDGAIN_HOME/work} folder
+     * if {@code GRIDGAIN_HOME} system or environment variable specified, otherwise it is relative to
+     * {@code work} folder under system {@code java.io.tmpdir} folder.
+     *
+     * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory()
+     */
+    public static final String DFLT_BASE_DIR = "swapspace";
+
+    /** Default maximum sparsity. */
+    public static final float DFLT_MAX_SPARSITY = 0.5f;
+
+    /** Default write buffer size in bytes. */
+    public static final int DFLT_BUF_SIZE = 64 * 1024;
+
+    /** Default write queue size in bytes. */
+    public static final int DFLT_QUE_SIZE = 1024 * 1024;
+
+    /** Name for {@code null} space. */
+    public static final String DFLT_SPACE_NAME = "gg-dflt-space";
+
+    /** Spaces. */
+    private final ConcurrentMap<String, Space> spaces = new ConcurrentHashMap<>();
+
+    /** Base directory. */
+    private String baseDir = DFLT_BASE_DIR;
+
+    /** Maximum sparsity. */
+    private float maxSparsity = DFLT_MAX_SPARSITY;
+
+    /** Eviction listener. */
+    private volatile SwapSpaceSpiListener evictLsnr;
+
+    /** Directory. */
+    private File dir;
+
+    /** Write buffer size. */
+    private int writeBufSize = DFLT_BUF_SIZE;
+
+    /** Max write queue size in bytes. */
+    private int maxWriteQueSize = DFLT_QUE_SIZE;
+
+    /** Read stripes number. */
+    private int readStripesNum = -1;
+
+    /** Logger. */
+    @IgniteLoggerResource
+    private IgniteLogger log;
+
+    /** Local node ID. */
+    @IgniteLocalNodeIdResource
+    private UUID locNodeId;
+
+    /** Name of the grid. */
+    @IgniteNameResource
+    private String gridName;
+
+    /** Marshaller. */
+    @IgniteMarshallerResource
+    private IgniteMarshaller marsh;
+
+    /** {@inheritDoc} */
+    @Override public String getBaseDirectory() {
+        return baseDir;
+    }
+
+    /**
+     * Sets base directory.
+     *
+     * @param baseDir Base directory.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public void setBaseDirectory(String baseDir) {
+        this.baseDir = baseDir;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getMaximumSparsity() {
+        return maxSparsity;
+    }
+
+    /**
+     * Sets maximum sparsity. This property defines maximum acceptable wasted file space to whole file size ratio.
+     * When this ratio becomes higher than specified number compacting thread starts working.
+     *
+     * @param maxSparsity Maximum sparsity. Must be between 0 and 1, default is {@link #DFLT_MAX_SPARSITY}.
+     */
+    public void setMaximumSparsity(float maxSparsity) {
+        this.maxSparsity = maxSparsity;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWriteBufferSize() {
+        return writeBufSize;
+    }
+
+    /**
+     * Sets write buffer size in bytes. Write to disk occurs only when this buffer is full. Default is
+     * {@link #DFLT_BUF_SIZE}.
+     *
+     * @param writeBufSize Write buffer size in bytes.
+     */
+    public void setWriteBufferSize(int writeBufSize) {
+        this.writeBufSize = writeBufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMaxWriteQueueSize() {
+        return maxWriteQueSize;
+    }
+
+    /**
+     * Sets max write queue size in bytes. If there are more values are waiting for being written to disk then specified
+     * size, SPI will block on {@link #store(String, org.gridgain.grid.spi.swapspace.SwapKey, byte[], org.gridgain.grid.spi.swapspace.SwapContext)} operation. Default is
+     * {@link #DFLT_QUE_SIZE}.
+     *
+     * @param maxWriteQueSize Max write queue size in bytes.
+     */
+    public void setMaxWriteQueueSize(int maxWriteQueSize) {
+        this.maxWriteQueSize = maxWriteQueSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getReadStripesNumber() {
+        return readStripesNum;
+    }
+
+    /**
+     * Sets read stripe size. Defines number of file channels to be used concurrently. Default is equal to number of
+     * CPU cores available to this JVM.
+     *
+     * @param readStripesNum Read stripe number.
+     */
+    public void setReadStripesNumber(int readStripesNum) {
+        A.ensure(readStripesNum == -1 || (readStripesNum & (readStripesNum - 1)) == 0,
+            "readStripesNum must be positive and power of two");
+
+        this.readStripesNum = readStripesNum;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+        assertParameter(!F.isEmpty(baseDir), "!F.isEmpty(baseDir)");
+        assertParameter(maxSparsity >= 0 && maxSparsity < 1, "maxSparsity >= 0 && maxSparsity < 1");
+        assertParameter(readStripesNum == -1 || (readStripesNum & (readStripesNum - 1)) == 0,
+            "readStripesNum must be positive and power of two.");
+
+        if (readStripesNum == -1) {
+            // User has not configured the number.
+            int readStripesNum0 = 1;
+            int cpuCnt = Runtime.getRuntime().availableProcessors();
+
+            while (readStripesNum0 <= cpuCnt)
+                readStripesNum0 <<= 1;
+
+            if (readStripesNum0 > cpuCnt)
+                readStripesNum0 >>= 1;
+
+            assert readStripesNum0 > 0 && (readStripesNum0 & readStripesNum0 - 1) == 0;
+
+            readStripesNum = readStripesNum0;
+        }
+
+        startStopwatch();
+
+        registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
+
+        String path = baseDir + File.separator + gridName + File.separator + locNodeId;
+
+        try {
+            dir = U.resolveWorkDirectory(path, true);
+        }
+        catch (GridException e) {
+            throw new IgniteSpiException(e);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug(startInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        unregisterMBean();
+
+        for (Space space : spaces.values()) {
+            space.initialize();
+
+            try {
+                space.stop();
+            }
+            catch (GridInterruptedException e) {
+                U.error(log, "Interrupted.", e);
+            }
+        }
+
+        if (dir != null && dir.exists() && !U.delete(dir))
+            U.warn(log, "Failed to delete swap directory: " + dir.getAbsolutePath());
+
+        if (log.isDebugEnabled())
+            log.debug(stopInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear(@Nullable String spaceName) throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return;
+
+        space.clear();
+
+        notifyListener(EVT_SWAP_SPACE_CLEARED, spaceName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size(@Nullable String spaceName) throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return 0;
+
+        return space.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long count(@Nullable String spaceName) throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return 0;
+
+        return space.count();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public byte[] read(@Nullable String spaceName, SwapKey key, SwapContext ctx)
+        throws IgniteSpiException {
+        assert key != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return null;
+
+        byte[] val = space.read(key);
+
+        notifyListener(EVT_SWAP_SPACE_DATA_READ, spaceName);
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<SwapKey, byte[]> readAll(@Nullable String spaceName, Iterable<SwapKey> keys,
+        SwapContext ctx) throws IgniteSpiException {
+        assert keys != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return Collections.emptyMap();
+
+        Map<SwapKey, byte[]> res = new HashMap<>();
+
+        for (SwapKey key : keys) {
+            if (key != null) {
+                byte[] val = space.read(key);
+
+                if (val != null)
+                    res.put(key, val);
+
+                notifyListener(EVT_SWAP_SPACE_DATA_READ, spaceName);
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(@Nullable String spaceName, SwapKey key, @Nullable IgniteInClosure<byte[]> c,
+        SwapContext ctx) throws IgniteSpiException {
+        assert key != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return;
+
+        byte[] val = space.remove(key, c != null);
+
+        if (c != null)
+            c.apply(val);
+
+        notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll(@Nullable String spaceName, Collection<SwapKey> keys,
+        @Nullable IgniteBiInClosure<SwapKey, byte[]> c, SwapContext ctx) throws IgniteSpiException {
+        assert keys != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return;
+
+        for (SwapKey key : keys) {
+            if (key != null) {
+                byte[] val = space.remove(key, c != null);
+
+                if (c != null)
+                    c.apply(key, val);
+
+                notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void store(@Nullable String spaceName, SwapKey key, @Nullable byte[] val,
+        SwapContext ctx) throws IgniteSpiException {
+        assert key != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, true);
+
+        assert space != null;
+
+        space.store(key, val);
+
+        notifyListener(EVT_SWAP_SPACE_DATA_STORED, spaceName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void storeAll(@Nullable String spaceName, Map<SwapKey, byte[]> pairs,
+        SwapContext ctx) throws IgniteSpiException {
+        assert pairs != null;
+        assert ctx != null;
+
+        Space space = space(spaceName, true);
+
+        assert space != null;
+
+        for (Map.Entry<SwapKey, byte[]> pair : pairs.entrySet()) {
+            SwapKey key = pair.getKey();
+
+            if (key != null) {
+                space.store(key, pair.getValue());
+
+                notifyListener(EVT_SWAP_SPACE_DATA_STORED, spaceName);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable SwapSpaceSpiListener evictLsnr) {
+        this.evictLsnr = evictLsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Collection<Integer> partitions(@Nullable String spaceName)
+        throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return null;
+
+        return space.partitions();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <K> IgniteSpiCloseableIterator<K> keyIterator(@Nullable String spaceName,
+        SwapContext ctx) throws IgniteSpiException {
+        final Space space = space(spaceName, false);
+
+        if (space == null)
+            return null;
+
+        final Iterator<Map.Entry<SwapKey, byte[]>> iter = space.entriesIterator();
+
+        return new GridCloseableIteratorAdapter<K>() {
+            @Override protected boolean onHasNext() {
+                return iter.hasNext();
+            }
+
+            @Override protected K onNext() {
+                return (K)iter.next().getKey().key();
+            }
+
+            @Override protected void onRemove() {
+                iter.remove();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(
+        @Nullable String spaceName) throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return null;
+
+        return rawIterator(space.entriesIterator());
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(
+        @Nullable String spaceName, int part) throws IgniteSpiException {
+        Space space = space(spaceName, false);
+
+        if (space == null)
+            return null;
+
+        return rawIterator(space.entriesIterator(part));
+    }
+
+    /**
+     * Creates raw iterator based on provided entries iterator.
+     *
+     * @param iter Entries iterator.
+     * @return Raw iterator.
+     */
+    private IgniteSpiCloseableIterator<Map.Entry<byte[], byte[]>> rawIterator(
+        final Iterator<Map.Entry<SwapKey, byte[]>> iter) {
+        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
+            @Override protected Map.Entry<byte[], byte[]> onNext() throws GridException {
+                Map.Entry<SwapKey, byte[]> x = iter.next();
+
+                return new T2<>(keyBytes(x.getKey()), x.getValue());
+            }
+
+            @Override protected boolean onHasNext() {
+                return iter.hasNext();
+            }
+
+            @Override protected void onRemove() {
+                iter.remove();
+            }
+        };
+    }
+
+    /**
+     * Gets key bytes.
+     *
+     * @param key Swap key.
+     * @return Key bytes.
+     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     */
+    private byte[] keyBytes(SwapKey key) throws IgniteSpiException {
+        assert key != null;
+
+        byte[] keyBytes = key.keyBytes();
+
+        if (keyBytes == null) {
+            try {
+                keyBytes = marsh.marshal(key.key());
+            }
+            catch (GridException e) {
+                throw new IgniteSpiException("Failed to marshal key: " + key.key(), e);
+            }
+
+            key.keyBytes(keyBytes);
+        }
+
+        return keyBytes;
+    }
+
+    /**
+     * Notifies eviction listener.
+     *
+     * @param evtType Event type.
+     * @param spaceName Space name.
+     */
+    private void notifyListener(int evtType, @Nullable String spaceName) {
+        SwapSpaceSpiListener lsnr = evictLsnr;
+
+        if (lsnr != null)
+            lsnr.onSwapEvent(evtType, spaceName, null);
+    }
+
+    /**
+     * Gets space by name.
+     *
+     * @param name Space name.
+     * @param create Whether to create space if it doesn't exist.
+     * @return Space.
+     * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+     */
+    @Nullable private Space space(@Nullable String name, boolean create) throws IgniteSpiException {
+        String masked = name != null ? name : DFLT_SPACE_NAME;
+
+        assert masked != null;
+
+        Space space = spaces.get(masked);
+
+        if (space == null && create) {
+            validateName(name);
+
+            Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+
+            if (old != null)
+                space = old;
+        }
+
+        if (space != null)
+            space.initialize();
+
+        return space;
+    }
+
+    /**
+     * Validates space name.
+     *
+     * @param name Space name.
+     * @throws org.apache.ignite.spi.IgniteSpiException If name is invalid.
+     */
+    private void validateName(@Nullable String name) throws IgniteSpiException {
+        if (name == null)
+            return;
+
+        if (name.isEmpty())
+            throw new IgniteSpiException("Space name cannot be empty: " + name);
+        else if (DFLT_SPACE_NAME.equalsIgnoreCase(name))
+            throw new IgniteSpiException("Space name is reserved for default space: " + name);
+        else if (name.contains("/") || name.contains("\\"))
+            throw new IgniteSpiException("Space name contains invalid characters: " + name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(FileSwapSpaceSpi.class, this);
+    }
+
+    /**
+     * Swap value.
+     */
+    static class SwapValue {
+        /** */
+        private static final int NEW = 0;
+
+        /** */
+        private static final int DELETED = Integer.MIN_VALUE;
+
+        /** */
+        private static final AtomicIntegerFieldUpdater<SwapValue> idxUpdater = AtomicIntegerFieldUpdater.
+            newUpdater(SwapValue.class, "idx");
+
+        /** */
+        private byte[] val;
+
+        /** */
+        private final int len;
+
+        /** */
+        @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+        private long pos = -1;
+
+        /** */
+        @SuppressWarnings("UnusedDeclaration")
+        private volatile int idx;
+
+        /**
+         * @param val Value.
+         */
+        SwapValue(byte[] val) {
+            assert val != null;
+
+            this.val = val;
+            len = val.length;
+        }
+
+        /**
+         * @param space Space.
+         * @return Value.
+         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+         */
+        @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+        @Nullable public synchronized byte[] value(Space space) throws IgniteSpiException {
+            byte[] v = val;
+
+            if (v == null) { // Read value from file.
+                int i = idx;
+
+                assert i != NEW;
+
+                if (i != DELETED) {
+                    StripedFileChannel ch = i < 0 ? space.left.readCh : space.right.readCh;
+
+                    if (idx != DELETED) // Double check works in pair with striped channel reopening.
+                        v = readValue(ch);
+                }
+            }
+            else if (v.length != len) {
+                int p = (int)pos;
+
+                v = Arrays.copyOfRange(v, p, p + len); // In case of compaction.
+            }
+
+            return v;
+        }
+
+        /**
+         * @param ch File channel.
+         * @return Bytes.
+         * @throws org.apache.ignite.spi.IgniteSpiException if failed.
+         */
+        @Nullable byte[] readValue(StripedFileChannel ch) throws IgniteSpiException {
+            byte[] v = new byte[len];
+
+            int res = 0;
+
+            try {
+                res = ch.read(ByteBuffer.wrap(v), pos);
+            }
+            catch (ClosedChannelException ignore) {
+                assert idx == DELETED;
+            }
+            catch (IOException e) {
+                throw new IgniteSpiException("Failed to read value.", e);
+            }
+
+            if (res < len)
+                return null; // When concurrent compaction occurs this may happen.
+
+            return v;
+        }
+
+        /**
+         * @param pos Position.
+         * @param val Value.
+         */
+        public synchronized void set(long pos, byte[] val) {
+            if (pos != -1)
+                this.pos = pos;
+
+            this.val = val;
+        }
+
+        /**
+         * @param exp Expected.
+         * @param idx New index.
+         * @return {@code true} if succeeded.
+         */
+        public boolean casIdx(int exp, int idx) {
+            return idxUpdater.compareAndSet(this, exp, idx);
+        }
+
+        /**
+         * @return Index in file array.
+         */
+        int idx() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return pos + " " + len;
+        }
+    }
+
+    /**
+     * Queue of swap values.
+     */
+    private static class SwapValuesQueue {
+        /** */
+        private final ArrayDeque<SwapValue> deq = new ArrayDeque<>();
+
+        /** */
+        @SuppressWarnings("TypeMayBeWeakened")
+        private final ReentrantLock lock = new ReentrantLock();
+
+        /** */
+        private final Condition mayAdd = lock.newCondition();
+
+        /** */
+        private final Condition mayTake = lock.newCondition();
+
+        /** */
+        private int size;
+
+        /** */
+        private final int minTakeSize;
+
+        /** */
+        private final int maxSize;
+
+        /**
+         * @param minTakeSize Min size.
+         * @param maxSize Max size.
+         */
+        private SwapValuesQueue(int minTakeSize, int maxSize) {
+            this.minTakeSize = minTakeSize;
+            this.maxSize = maxSize;
+        }
+
+        /**
+         * Adds to queue.
+         *
+         * @param val Swap value.
+         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+         */
+        public void add(SwapValue val) throws IgniteSpiException {
+            lock.lock();
+
+            try {
+                while (size + val.len > maxSize)
+                    mayAdd.await();
+
+                size += val.len;
+
+                deq.addLast(val);
+
+                if (size >= minTakeSize)
+                    mayTake.signalAll();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteSpiException(e);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * Takes swap values from queue.
+         *
+         * @return Swap values.
+         * @throws InterruptedException If interrupted.
+         */
+        public SwapValues take() throws InterruptedException {
+            lock.lock();
+
+            try {
+                while (size < minTakeSize)
+                    mayTake.await();
+
+                int size = 0;
+                int cnt = 0;
+
+                for (SwapValue val : deq) {
+                    size += val.len;
+                    cnt++;
+
+                    if (size >= minTakeSize)
+                        break;
+                }
+
+                SwapValue[] vals = new SwapValue[cnt];
+
+                for (int i = 0; i < cnt; i++) {
+                    SwapValue val = deq.pollFirst();
+
+                    vals[i] = val;
+                }
+
+                if ((this.size -= size) < maxSize)
+                    mayAdd.signalAll();
+
+                return new SwapValues(vals, size);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Array of swap values and their size in bytes.
+     */
+    static class SwapValues {
+        /** */
+        private final SwapValue[] vals;
+
+        /** Size in bytes. */
+        private final int size;
+
+        /**
+         * @param vals Values.
+         * @param size Size.
+         */
+        SwapValues(SwapValue[] vals, int size) {
+            this.vals = vals;
+            this.size = size;
+        }
+    }
+
+    /**
+     * Readable striped file channel.
+     */
+    private static class StripedFileChannel {
+        /** */
+        private final AtomicInteger enter = new AtomicInteger();
+
+        /** */
+        private final RandomAccessFile[] rafs;
+
+        /** */
+        private final FileChannel[] chs;
+
+        /**
+         * @param f File.
+         * @param stripes Stripes.
+         * @throws FileNotFoundException If failed.
+         */
+        StripedFileChannel(File f, int stripes) throws FileNotFoundException {
+            assert stripes > 0 && (stripes & (stripes - 1)) == 0 : "stripes must be positive and power of two.";
+
+            rafs = new RandomAccessFile[stripes];
+            chs = new FileChannel[stripes];
+
+            for (int i = 0; i < stripes; i++) {
+                RandomAccessFile raf = new RandomAccessFile(f, "r");
+
+                rafs[i] = raf;
+                chs[i] = raf.getChannel();
+            }
+        }
+
+        /**
+         * Reads data from file channel to buffer.
+         *
+         * @param buf Buffer.
+         * @param pos Position.
+         * @return Read bytes count.
+         * @throws IOException If failed.
+         */
+        int read(ByteBuffer buf, long pos) throws IOException {
+            int i = enter.getAndIncrement() & (chs.length - 1);
+
+            return chs[i].read(buf, pos);
+        }
+
+        /**
+         * Closes channel.
+         */
+        void close() {
+            for (RandomAccessFile raf : rafs)
+                U.closeQuiet(raf);
+        }
+    }
+
+    /**
+     * Swap file.
+     */
+    static class SwapFile {
+        /** */
+        private static final long MIN_TRUNK_SIZE = 10 * 1024 * 1024;
+
+        /** */
+        private final File file;
+
+        /** */
+        private final RandomAccessFile raf;
+
+        /** */
+        private final FileChannel writeCh;
+
+        /** */
+        volatile StripedFileChannel readCh;
+
+        /** */
+        private volatile long len;
+
+        /** */
+        private final FileSwapArray<SwapValue> arr = new FileSwapArray<>();
+
+        /**
+         * @param file File.
+         * @param readerStripes Reader stripes number.
+         * @throws IOException In case of error.
+         */
+        SwapFile(File file, int readerStripes) throws IOException {
+            assert file != null;
+
+            file.delete();
+
+            if (!file.createNewFile())
+                throw new IllegalStateException("Failed to create file: " + file.getAbsolutePath());
+
+            this.file = file;
+
+            raf = new RandomAccessFile(file, "rw");
+
+            writeCh = raf.getChannel();
+
+            readCh = new StripedFileChannel(file, readerStripes);
+        }
+
+        /**
+         * Reopens read channel.
+         *
+         * @throws FileNotFoundException If failed.
+         */
+        void reopenReadChannel() throws FileNotFoundException {
+            readCh.close();
+
+            readCh = new StripedFileChannel(file, readCh.chs.length);
+        }
+
+        /**
+         * @param vals Values.
+         * @param buf Duffer.
+         * @param sign Indicates where should we write value, to the left or to the right.
+         * @throws Exception If failed.
+         */
+        public void write(Iterable<SwapValue> vals, ByteBuffer buf, int sign) throws Exception {
+            for (SwapValue val : vals) {
+                int oldIdx = val.idx;
+
+                if (oldIdx == SwapValue.DELETED)
+                    continue;
+
+                int idx = arr.add(val);
+
+                if (!val.casIdx(oldIdx, sign * idx)) {
+                    assert val.idx == SwapValue.DELETED;
+
+                    boolean res = tryRemove(idx, val);
+
+                    assert res;
+                }
+            }
+
+            final int size = buf.remaining();
+
+            if (size == 0)
+                return;
+
+            long pos = len;
+
+            len = pos + size;
+
+            long res = writeCh.write(buf, pos);
+
+            if (res != size)
+                throw new IllegalStateException(res + " != " + size);
+
+            // Nullify bytes in values ans set pos.
+            for (SwapValue val : vals) {
+                val.set(pos, null);
+
+                pos += val.len;
+            }
+        }
+
+        /**
+         * @param vals Values.
+         * @param sign Sign: 1 or -1.
+         * @throws Exception If failed.
+         */
+        public void write(SwapValues vals, int sign) throws Exception {
+            ByteBuffer buf = ByteBuffer.allocateDirect(vals.size);
+
+            for (int i = 0, len = vals.vals.length; i < len; i++) {
+                SwapValue val = vals.vals[i];
+
+                if (val.idx == SwapValue.DELETED) {
+                    vals.vals[i] = null;
+
+                    continue;
+                }
+
+                int idx = arr.add(val);
+
+                if (!val.casIdx(SwapValue.NEW, sign * idx)) {
+                    assert val.idx == SwapValue.DELETED;
+
+                    tryRemove(idx, val);
+
+                    vals.vals[i] = null;
+                }
+                else
+                    buf.put(val.value(null));
+            }
+
+            buf.flip();
+
+            final int size = buf.remaining();
+
+            if (size == 0)
+                return;
+
+            long pos = len;
+
+            len = pos + size;
+
+            long res = writeCh.write(buf, pos);
+
+            if (res != size)
+                throw new IllegalStateException(res + " != " + size);
+
+            // Nullify bytes in values ans set pos.
+            for (SwapValue val : vals.vals) {
+                if (val == null)
+                    continue;
+
+                val.set(pos, null);
+
+                pos += val.len;
+            }
+        }
+
+        /**
+         * Gets file path.
+         *
+         * @return File path.
+         */
+        public String path() {
+            return file.getAbsolutePath();
+        }
+
+        /**
+         * Gets file length.
+         *
+         * @return File length.
+         */
+        public long length() {
+            return len;
+        }
+
+        /**
+         * Deletes file.
+         *
+         * @return Whether file was actually deleted.
+         */
+        public boolean delete() {
+            U.closeQuiet(raf);
+
+            readCh.close();
+
+            return U.delete(file);
+        }
+
+        /**
+         * @param idx Index.
+         * @param exp Expected value.
+         * @return {@code true} If succeeded.
+         */
+        public boolean tryRemove(int idx, SwapValue exp) {
+            assert idx > 0 : idx;
+
+            FileSwapArray.Slot<SwapValue> s = arr.slot(idx);
+
+            return s != null && s.cas(exp, null);
+        }
+
+        /**
+         * Does compaction for one buffer.
+         *
+         * @param vals Values.
+         * @param bufSize Buffer size.
+         * @return Buffer.
+         * @throws IOException If failed.
+         * @throws InterruptedException If interrupted.
+         */
+        public ByteBuffer compact(ArrayDeque<SwapValue> vals, final int bufSize) throws IOException,
+            InterruptedException {
+            assert vals.isEmpty();
+
+            Compact c = new Compact(vals, bufSize);
+
+            c.doCompact();
+
+            return c.result();
+        }
+
+        /**
+         * Single compaction operation.
+         */
+        private class Compact {
+            /** */
+            private final ArrayDeque<SwapValue> vals;
+
+            /** */
+            private final int bufSize;
+
+            /** */
+            private byte[] bytes;
+
+            /** */
+            private ByteBuffer buf;
+
+            /** */
+            private long beg = -1;
+
+            /** */
+            private long end = -1;
+
+            /** */
+            private int compacted;
+
+            /**
+             * @param vals Values.
+             * @param bufSize Buffer size.
+             */
+            private Compact(ArrayDeque<SwapValue> vals, final int bufSize) {
+                assert vals.isEmpty();
+
+                this.vals = vals;
+                this.bufSize = bufSize;
+            }
+
+            /**
+             * Reads buffer and compacts it.
+             *
+             * @throws IOException if failed.
+             */
+            private void readAndCompact() throws IOException {
+                assert beg != -1;
+
+                if (buf == null) {
+                    bytes = new byte[bufSize];
+
+                    buf = ByteBuffer.wrap(bytes);
+                }
+
+                final int pos = buf.position();
+
+                final int lim = (int)(end - beg + pos);
+
+                assert pos >= 0;
+                assert pos < lim : pos + " " + lim;
+                assert lim <= buf.capacity();
+
+                buf.limit(lim);
+
+                int res = writeCh.read(buf, beg);
+
+                assert res == lim - pos;
+
+                int prevEnd = pos;
+                long delta = beg - pos; // To translate from file based positions to buffer based.
+
+                for (int j = vals.size(); j > compacted; j--) {
+                    SwapValue val = vals.pollFirst();
+
+                    int valPos = (int)(val.pos - delta);
+
+                    if (prevEnd != valPos) {
+                        assert prevEnd < valPos : prevEnd + " " + valPos;
+
+                        U.arrayCopy(bytes, valPos, bytes, prevEnd, val.len);
+                    }
+
+                    prevEnd += val.len;
+
+                    vals.addLast(val); // To have values in the same order as in byte buffer.
+                }
+
+                assert prevEnd > 0 : prevEnd;
+
+                buf.position(prevEnd);
+
+                end = -1;
+
+                compacted = vals.size();
+            }
+
+            /**
+             * Compacts.
+             *
+             * @throws IOException If failed.
+             */
+            private void doCompact() throws IOException {
+                int idx = arr.size();
+
+                while (--idx > 0) {
+                    FileSwapArray.Slot<SwapValue> s = arr.slot(idx);
+
+                    assert s != null;
+
+                    SwapValue v = s.get();
+
+                    if (v == null || v.idx == SwapValue.DELETED)
+                        continue;
+
+                    if (end == -1)
+                        end = v.pos + v.len;
+
+                    long size = end - v.pos;
+
+                    if ((buf == null ? bufSize : buf.remaining()) < size) {
+                        if (vals.isEmpty()) {  // Too big single value.
+                            assert bytes == null && buf == null;
+
+                            bytes = new byte[(int)size];
+
+                            buf = ByteBuffer.wrap(bytes);
+                        }
+                        else if (compacted == vals.size())
+                            break; // Finish current compaction, nothing new collected.
+                        else { // Read region and compact values in buffer.
+                            readAndCompact();
+
+                            // Retry the same value.
+                            idx++;
+
+                            continue;
+                        }
+                    }
+
+                    beg = v.pos;
+
+                    vals.addFirst(v);
+
+                    s.cas(v, null);
+                }
+
+                if (vals.isEmpty()) {
+                    arr.truncate(1);
+
+                    writeCh.truncate(0);
+
+                    len = 0;
+
+                    reopenReadChannel(); // Make sure that value can be read only from right file but not after switch.
+
+                    return;
+                }
+
+                if (compacted != vals.size())
+                    readAndCompact();
+
+                int pos = 0;
+
+                for (SwapValue val : vals) { // The values will share one byte array with different offsets while moving.
+                    val.set(pos, bytes);
+
+                    pos += val.len;
+                }
+
+                buf.flip();
+
+                assert buf.limit() == pos : buf.limit() + " " + pos;
+
+                arr.truncate(idx + 1);
+
+                if (len - beg > MIN_TRUNK_SIZE) {
+                    writeCh.truncate(beg);
+
+                    len = beg;
+                }
+            }
+
+            /**
+             * @return Buffer.
+             */
+            public ByteBuffer result() {
+                return buf;
+            }
+        }
+    }
+
+    /**
+     * Space.
+     */
+    private class Space {
+        /** Space name. */
+        private final String name;
+
+        /** */
+        private final GridAtomicInitializer<Void> initializer = new GridAtomicInitializer<>();
+
+        /** Swap file left. */
+        @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+        private SwapFile left;
+
+        /** Swap file right. */
+        @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+        private SwapFile right;
+
+        /** */
+        private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+
+        /** Partitions. */
+        private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
+            new ConcurrentHashMap8<>();
+
+        /** Total size. */
+        private final AtomicLong size = new AtomicLong();
+
+        /** Total count. */
+        private final AtomicLong cnt = new AtomicLong();
+
+        /** */
+        private int sign = 1;
+
+        /** Writer thread. */
+        private Thread writer;
+
+        /** */
+        private Thread compactor;
+
+        /**
+         * @param name Space name.
+         */
+        private Space(String name) {
+            assert name != null;
+
+            this.name = name;
+        }
+
+        /**
+         * Initializes space.
+         *
+         * @throws org.apache.ignite.spi.IgniteSpiException If initialization failed.
+         */
+        public void initialize() throws IgniteSpiException {
+            if (initializer.succeeded())
+                return;
+
+            assert dir.exists();
+            assert dir.isDirectory();
+
+            try {
+                initializer.init(new Callable<Void>(){
+                    @Override public Void call() throws Exception {
+                        left = new SwapFile(new File(dir, name + ".left"), readStripesNum);
+
+                        right = new SwapFile(new File(dir, name + ".right"), readStripesNum);
+
+                        final Object mux = new Object();
+
+                        writer = new IgniteSpiThread(gridName,  "Swap writer: " + name, log) {
+                            @Override protected void body() throws InterruptedException {
+                                while (!isInterrupted()) {
+                                    SwapValues vals = que.take();
+
+                                    synchronized (mux) {
+                                        SwapFile f = sign == 1 ? right : left;
+
+                                        try {
+                                            f.write(vals, sign);
+                                        }
+                                        catch (Exception e) {
+                                            throw new GridRuntimeException(e);
+                                        }
+                                    }
+                                }
+                            }
+                        };
+
+                        compactor = new IgniteSpiThread(gridName, "Swap compactor: " + name, log) {
+                            @Override protected void body() throws InterruptedException {
+                                SwapFile w = null;
+                                SwapFile c = null;
+
+                                ArrayDeque<SwapValue> vals = null;
+
+                                while (!isInterrupted()) {
+                                    while(!needCompact()) {
+                                        LockSupport.park();
+
+                                        if (isInterrupted())
+                                            return;
+                                    }
+
+                                    ByteBuffer buf = null;
+
+                                    if (vals == null)
+                                        vals = new ArrayDeque<>();
+                                    else {
+                                        vals.clear();
+
+                                        try {
+                                            buf = c.compact(vals, writeBufSize);
+                                        }
+                                        catch (IOException e) {
+                                            throw new GridRuntimeException(e);
+                                        }
+                                    }
+
+                                    if (vals.isEmpty()) {
+                                        synchronized (mux) {
+                                            sign = -sign;
+
+                                            if (sign == 1) {
+                                                w = right;
+                                                c = left;
+                                            }
+                                            else {
+                                                w = left;
+                                                c = right;
+                                            }
+                                        }
+                                    }
+                                    else {
+                                        assert buf != null && buf.remaining() != 0;
+
+                                        synchronized (mux) {
+                                            try {
+                                                w.write(vals, buf, sign);
+                                            }
+                                            catch (Exception e) {
+                                                throw new GridRuntimeException(e);
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        };
+
+                        writer.start();
+                        compactor.start();
+
+                        return null;
+                    }
+                });
+            }
+            catch (GridException e) {
+                throw new IgniteSpiException(e);
+            }
+        }
+
+        /**
+         * Gets total space size in bytes.
+         *
+         * @return Total size.
+         */
+        public long size() {
+            return left.length() + right.length();
+        }
+
+        /**
+         * Gets total space count.
+         *
+         * @return Total count.
+         */
+        public long count() {
+            return cnt.get();
+        }
+
+        /**
+         * Clears space.
+         *
+         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+         */
+        public void clear() throws IgniteSpiException {
+            Iterator<Map.Entry<SwapKey, byte[]>> iter = entriesIterator();
+
+            while (iter.hasNext())
+                remove(iter.next().getKey(), false);
+        }
+
+        /**
+         * Stops space.
+         *
+         * @throws GridInterruptedException If interrupted.
+         */
+        public void stop() throws GridInterruptedException {
+            U.interrupt(writer);
+            U.interrupt(compactor);
+
+            U.join(writer);
+            U.join(compactor);
+
+            left.delete();
+            right.delete();
+        }
+
+        /**
+         * Stores value in space.
+         *
+         * @param key Key.
+         * @param val Value.
+         * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+         */
+        public void store(final SwapKey key, @Nullable final byte[] val) throws IgniteSpiException {
+            assert key != null;
+
+            final ConcurrentMap<SwapKey, SwapValue> part = partition(key.partition(), true);
+
+            assert part != null;
+
+            if (val == null) {
+                SwapValue swapVal = part.remove(key);
+
+                if (swapVal != null) {
+                    removeFromFile(swapVal);
+
+                    size.addAndGet(-swapVal.len);
+                    cnt.decrementAndGet();
+                }
+
+                return;
+            }
+
+            final SwapValue swapVal = new SwapValue(val);
+
+            SwapValue old = part.put(key, swapVal);
+
+            if (old != null) {
+                size.addAndGet(val.length - old.len);
+
+                removeFromFile(old);
+            }
+            else {
+                size.addAndGet(val.length);
+                cnt.incrementAndGet();
+            }
+
+            que.add(swapVal);
+        }
+
+        /**
+         * Reads value from space.
+         *
+         * @param key Key.
+         * @return Value.
+         * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+         */
+        @Nullable public byte[] read(SwapKey key) throws IgniteSpiException {
+            assert key != null;
+
+            final Map<SwapKey, SwapValue> part = partition(key.partition(), false);
+
+            if (part == null)
+                return null;
+
+            SwapValue swapVal = part.get(key);
+
+            if (swapVal == null)
+                return null;
+
+            return swapVal.value(this);
+        }
+
+        /**
+         * Removes value from space.
+         *
+         * @param key Key.
+         * @param read If value has to be read.
+         * @return Value.
+         * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+         */
+        @Nullable public byte[] remove(SwapKey key, boolean read) throws IgniteSpiException {
+            assert key != null;
+
+            final Map<SwapKey, SwapValue> part = partition(key.partition(), false);
+
+            if (part == null)
+                return null;
+
+            SwapValue val = part.remove(key);
+
+            if (val == null)
+                return null;
+
+            size.addAndGet(-val.len);
+
+            cnt.decrementAndGet();
+
+            byte[] bytes = null;
+
+            if (read) {
+                bytes = val.value(this);
+
+                assert bytes != null; // Value bytes were read before removal from file, so compaction can't happen.
+            }
+
+            removeFromFile(val);
+
+            return bytes;
+        }
+
+        /**
+         * @param val Value.
+         */
+        private void removeFromFile(SwapValue val) {
+            for (;;) {
+                int idx = val.idx;
+
+                assert idx != SwapValue.DELETED;
+
+                if (val.casIdx(idx, SwapValue.DELETED)) {
+                    if (idx != SwapValue.NEW) {
+                        SwapFile f = idx > 0 ? right : left;
+
+                        f.tryRemove(Math.abs(idx), val);
+                    }
+
+                    break;
+                }
+            }
+
+            if (needCompact())
+                LockSupport.unpark(compactor);
+        }
+
+        /**
+         * @return {@code true} If compaction needed.
+         */
+        private boolean needCompact() {
+            long fileLen = size();
+
+            return fileLen > writeBufSize && (fileLen - size.get()) / (float)fileLen > maxSparsity;
+        }
+
+        /**
+         * Gets numbers of partitioned stored in this space.
+         *
+         * @return Partition numbers.
+         */
+        public Collection<Integer> partitions() {
+            return parts.keySet();
+        }
+
+        /**
+         * Gets partition map by its number.
+         *
+         * @param part Partition number.
+         * @param create Whether to create partition if it doesn't exist.
+         * @return Partition map.
+         */
+        @Nullable private ConcurrentMap<SwapKey, SwapValue> partition(int part, boolean create) {
+            ConcurrentMap<SwapKey, SwapValue> map = parts.get(part);
+
+            if (map == null && create) {
+                ConcurrentMap<SwapKey, SwapValue> old = parts.putIfAbsent(part,
+                    map = new ConcurrentHashMap<>());
+
+                if (old != null)
+                    map = old;
+            }
+
+            return map;
+        }
+
+        /**
+         * @param part Partition.
+         * @return Iterator over partition.
+         */
+        public Iterator<Map.Entry<SwapKey, byte[]>> entriesIterator(int part) {
+            Map<SwapKey, SwapValue> partMap = partition(part, false);
+
+            if (partMap == null)
+                return Collections.<Map.Entry<SwapKey, byte[]>>emptySet().iterator();
+
+            return transform(partMap.entrySet().iterator());
+        }
+
+        /**
+         * @return Iterator over all entries.
+         */
+        public Iterator<Map.Entry<SwapKey, byte[]>> entriesIterator() {
+            final Iterator<ConcurrentMap<SwapKey, SwapValue>> iter = parts.values().iterator();
+
+            return transform(F.concat(new Iterator<Iterator<Map.Entry<SwapKey, SwapValue>>>() {
+                @Override public boolean hasNext() {
+                    return iter.hasNext();
+                }
+
+                @Override public Iterator<Map.Entry<SwapKey, SwapValue>> next() {
+                    return iter.next().entrySet().iterator();
+                }
+
+                @Override public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            }));
+        }
+
+        /**
+         * Gets iterator for all entries in space.
+         *
+         * @param iter Iterator with {@link SwapValue} to transform.
+         * @return Entries iterator.
+         */
+        private Iterator<Map.Entry<SwapKey, byte[]>> transform(final Iterator<Map.Entry<SwapKey,
+            SwapValue>> iter) {
+            return new Iterator<Map.Entry<SwapKey, byte[]>>() {
+                /** */
+                private Map.Entry<SwapKey, byte[]> next;
+
+                /** */
+                private Map.Entry<SwapKey, byte[]> last;
+
+                {
+                    advance();
+                }
+
+                @Override public boolean hasNext() {
+                    return next != null;
+                }
+
+                /**
+                 * Gets next entry.
+                 */
+                private void advance() {
+                    while (iter.hasNext()) {
+                        Map.Entry<SwapKey, SwapValue> entry = iter.next();
+
+                        byte[] bytes;
+
+                        try {
+                            bytes = entry.getValue().value(Space.this);
+                        }
+                        catch (IgniteSpiException e) {
+                            throw new GridRuntimeException(e);
+                        }
+
+                        if (bytes != null) {
+                            next = new T2<>(entry.getKey(), bytes);
+
+                            break;
+                        }
+                    }
+                }
+
+                @Override public Map.Entry<SwapKey, byte[]> next() {
+                    final Map.Entry<SwapKey, byte[]> res = next;
+
+                    if (res == null)
+                        throw new NoSuchElementException();
+
+                    next = null;
+
+                    advance();
+
+                    last = res;
+
+                    return res;
+                }
+
+                @Override public void remove() {
+                    if (last == null)
+                        throw new IllegalStateException();
+
+                    try {
+                        Space.this.remove(last.getKey(), false);
+                    }
+                    catch (IgniteSpiException e) {
+                        throw new GridRuntimeException(e);
+                    }
+                    finally {
+                        last = null;
+                    }
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c31cec7c/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpiMBean.java
new file mode 100644
index 0000000..12b6a67
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/FileSwapSpaceSpiMBean.java
@@ -0,0 +1,59 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.spi.swapspace.file;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Management bean for {@link FileSwapSpaceSpi}.
+ */
+@IgniteMBeanDescription("MBean that provides configuration information on file-based swapspace SPI.")
+public interface FileSwapSpaceSpiMBean extends IgniteSpiManagementMBean {
+    /**
+     * Gets base directory.
+     *
+     * @return Base directory.
+     */
+    @IgniteMBeanDescription("Base directory.")
+    public String getBaseDirectory();
+
+    /**
+     * Gets maximum sparsity.
+     *
+     * @return Maximum sparsity.
+     */
+    @IgniteMBeanDescription("Maximum sparsity.")
+    public float getMaximumSparsity();
+
+    /**
+     * Gets write buffer size in bytes.
+     *
+     * @return Write buffer size in bytes.
+     */
+    @IgniteMBeanDescription("Write buffer size in bytes.")
+    public int getWriteBufferSize();
+
+    /**
+     * Gets max write queue size in bytes.
+     *
+     * @return Max write queue size in bytes.
+     */
+    @IgniteMBeanDescription("Max write queue size in bytes.")
+    public int getMaxWriteQueueSize();
+
+    /**
+     * Gets read pool size.
+     *
+     * @return Read pool size.
+     */
+    @IgniteMBeanDescription("Read pool size.")
+    public int getReadStripesNumber();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c31cec7c/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/GridFileSwapArray.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/GridFileSwapArray.java b/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/GridFileSwapArray.java
deleted file mode 100644
index 8796baf..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/spi/swapspace/file/GridFileSwapArray.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.swapspace.file;
-
-import org.jetbrains.annotations.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- * Growing array.
- */
-class GridFileSwapArray<X> {
-    /** First partition size must be power of two. */
-    private static final int FIRST_ARRAY_SIZE = 4096;
-
-    /** */
-    private static final int LADDER_SIZE = Integer.numberOfLeadingZeros(FIRST_ARRAY_SIZE) + 1;
-
-    /** */
-    @SuppressWarnings("unchecked")
-    private final AtomicReferenceArray<X>[] ladder = new AtomicReferenceArray[LADDER_SIZE];
-
-    /** */
-    private int idx = 1;
-
-    /**
-     *
-     */
-    GridFileSwapArray() {
-        synchronized (ladder) {
-            ladder[0] = new AtomicReferenceArray<>(FIRST_ARRAY_SIZE);
-        }
-    }
-
-    /**
-     * @return Size.
-     */
-    public int size() {
-        return idx;
-    }
-
-    /**
-     * Adds value to the end.
-     *
-     * @param x Value.
-     * @return Index where it was added.
-     */
-    int add(X x) {
-        int i = idx++;
-
-        assert i >= 0 && i != Integer.MAX_VALUE : "Integer overflow";
-
-        Slot<X> s = slot(i);
-
-        assert s != null; // We should add always in one thread.
-
-        s.set(x);
-
-        int len = s.arr.length();
-
-        if (s.idx + 1 == len) {
-            synchronized (ladder) {
-                ladder[s.arrIdx + 1] = new AtomicReferenceArray<>(s.arrIdx == 0 ? len : len << 1);
-            }
-        }
-
-        return i;
-    }
-
-    /**
-     * @param size New size.
-     */
-    void truncate(int size) {
-        assert size > 0;
-
-        idx = size;
-
-        int arrIdx = arrayIndex(idx) + 1;
-
-        if (arrIdx < ladder.length && ladder[arrIdx] != null) {
-            synchronized (ladder) {
-                do {
-                    ladder[arrIdx++] = null;
-                }
-                while (arrIdx < ladder.length && ladder[arrIdx] != null);
-            }
-        }
-    }
-
-    /**
-     * @param idx Absolute slot index.
-     * @return Array index in {@link #ladder}.
-     */
-    static int arrayIndex(int idx) {
-        if (idx < FIRST_ARRAY_SIZE)
-            return 0;
-
-        return LADDER_SIZE - Integer.numberOfLeadingZeros(idx);
-    }
-
-    /**
-     * Slot for given absolute index.
-     *
-     * @param idx Absolute index.
-     * @return Slot.
-     */
-    @Nullable Slot<X> slot(int idx) {
-        assert idx > 0 : idx;
-
-        int arrIdx = arrayIndex(idx);
-
-        AtomicReferenceArray<X> arr = ladder[arrIdx];
-
-        if (arr == null) {
-            synchronized (ladder) { // Ensure visibility.
-                arr = ladder[arrIdx];
-            }
-
-            if (arr == null)
-                return null;
-        }
-
-        return new Slot<>(arrIdx, arr, arrIdx == 0 ? idx : idx - arr.length());
-    }
-
-    /**
-     * Slot in array.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    static final class Slot<X> {
-        /** */
-        private final int arrIdx;
-
-        /** */
-        private final AtomicReferenceArray<X> arr;
-
-        /** */
-        private final int idx;
-
-        /**
-         * @param arrIdx Index of array.
-         * @param arr Array.
-         * @param idx Index within the array.
-         */
-        private Slot(int arrIdx, AtomicReferenceArray<X> arr, int idx) {
-            this.arrIdx = arrIdx;
-            this.arr = arr;
-            this.idx = idx;
-        }
-
-        /**
-         * @return Value.
-         */
-        public X get() {
-            return arr.get(idx);
-        }
-
-        /**
-         * @param exp Expected.
-         * @param x New value.
-         * @return {@code true} If succeeded.
-         */
-        public boolean cas(@Nullable X exp, @Nullable X x) {
-            return exp == x || arr.compareAndSet(idx, exp, x);
-        }
-
-        /**
-         * @param x value.
-         */
-        private void set(X x) {
-            arr.lazySet(idx, x);
-        }
-    }
-}


Mime
View raw message