ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [18/76] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 18:03:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
new file mode 100644
index 0000000..c260f1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.*;
+
+/**
+ * GGFS fragmentizer manager.
+ */
+public class IgfsFragmentizerManager extends IgfsManager {
+    /** Message offer wait interval. */
+    private static final int MSG_OFFER_TIMEOUT = 1000;
+
+    /** Fragmentizer files check interval. */
+    private static final int FRAGMENTIZER_CHECK_INTERVAL = 3000;
+
+    /** Message send retry interval. */
+    private static final int MESSAGE_SEND_RETRY_INTERVAL = 1000;
+
+    /** How many times retry message send. */
+    private static final int MESSAGE_SEND_RETRY_COUNT = 3;
+
+    /** Manager stopping flag. */
+    private volatile boolean stopping;
+
+    /** Coordinator worker. */
+    private volatile FragmentizerCoordinator fragmentizerCrd;
+
+    /** This variable is used in tests only. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private volatile boolean fragmentizerEnabled = true;
+
+    /** Fragmentizer worker. */
+    private FragmentizerWorker fragmentizerWorker;
+
+    /** Shutdown lock. */
+    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+
+    /** Message topic. */
+    private Object topic;
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        if (!igfsCtx.configuration().isFragmentizerEnabled())
+            return;
+
+        // We care only about node leave and fail events.
+        igfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                checkLaunchCoordinator(discoEvt);
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        fragmentizerWorker = new FragmentizerWorker();
+
+        String ggfsName = igfsCtx.configuration().getName();
+
+        topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName);
+
+        igfsCtx.kernalContext().io().addMessageListener(topic, fragmentizerWorker);
+
+        new IgniteThread(fragmentizerWorker).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        if (igfsCtx.configuration().isFragmentizerEnabled()) {
+            // Check at startup if this node is a fragmentizer coordinator.
+            DiscoveryEvent locJoinEvt = igfsCtx.kernalContext().discovery().localJoinEvent();
+
+            checkLaunchCoordinator(locJoinEvt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("BusyWait")
+    @Override protected void onKernalStop0(boolean cancel) {
+        boolean interrupted = false;
+
+        // Busy wait is intentional.
+        while (true) {
+            try {
+                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                    break;
+                else
+                    Thread.sleep(200);
+            }
+            catch (InterruptedException ignore) {
+                // Preserve interrupt status & ignore.
+                // Note that interrupted flag is cleared.
+                interrupted = true;
+            }
+        }
+
+        try {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+
+            stopping = true;
+        }
+        finally {
+            rw.writeUnlock();
+        }
+
+        synchronized (this) {
+            if (fragmentizerCrd != null)
+                fragmentizerCrd.cancel();
+        }
+
+        if (fragmentizerWorker != null)
+            fragmentizerWorker.cancel();
+
+        U.join(fragmentizerCrd, log);
+        U.join(fragmentizerWorker, log);
+    }
+
+    /**
+     * @param nodeId Node ID to send message to.
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If send failed.
+     */
+    private void sendWithRetries(UUID nodeId, IgfsCommunicationMessage msg) throws IgniteCheckedException {
+        for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) {
+            try {
+                igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
+
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                if (!igfsCtx.kernalContext().discovery().alive(nodeId))
+                    throw new ClusterTopologyCheckedException("Failed to send message (node left the grid) " +
+                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
+
+                if (i == MESSAGE_SEND_RETRY_COUNT - 1)
+                    throw e;
+
+                U.sleep(MESSAGE_SEND_RETRY_INTERVAL);
+            }
+        }
+    }
+
+    /**
+     * Checks if current node is the oldest node in topology and starts coordinator thread if so.
+     * Note that once node is the oldest one, it will be the oldest until it leaves grid.
+     *
+     * @param discoEvt Discovery event.
+     */
+    private void checkLaunchCoordinator(DiscoveryEvent discoEvt) {
+        rw.readLock();
+
+        try {
+            if (stopping)
+                return;
+
+            if (fragmentizerCrd == null) {
+                long minNodeOrder = Long.MAX_VALUE;
+
+                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
+
+                for (ClusterNode node : nodes) {
+                    if (node.order() < minNodeOrder && igfsCtx.ggfsNode(node))
+                        minNodeOrder = node.order();
+                }
+
+                ClusterNode locNode = igfsCtx.kernalContext().grid().localNode();
+
+                if (locNode.order() == minNodeOrder) {
+                    if (log.isDebugEnabled())
+                        log.debug("Detected local node to be the eldest GGFS node in topology, starting fragmentizer " +
+                            "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']');
+
+                    synchronized (this) {
+                        if (fragmentizerCrd == null && !stopping) {
+                            fragmentizerCrd = new FragmentizerCoordinator();
+
+                            new IgniteThread(fragmentizerCrd).start();
+                        }
+                    }
+                }
+            }
+        }
+        finally {
+            rw.readUnlock();
+        }
+    }
+
+    /**
+     * Processes fragmentizer request. For each range assigned to this node:
+     * <ul>
+     *     <li>Mark range as moving indicating that block copying started.</li>
+     *     <li>Copy blocks to non-colocated keys.</li>
+     *     <li>Update map to indicate that blocks were copied and old blocks should be deleted.</li>
+     *     <li>Delete old blocks.</li>
+     *     <li>Remove range from file map.</li>
+     * </ul>
+     *
+     * @param req Request.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("fallthrough")
+    private void processFragmentizerRequest(IgfsFragmentizerRequest req) throws IgniteCheckedException {
+        req.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
+
+        Collection<IgfsFileAffinityRange> ranges = req.fragmentRanges();
+        IgniteUuid fileId = req.fileId();
+
+        IgfsFileInfo fileInfo = igfsCtx.meta().info(fileId);
+
+        if (fileInfo == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find file info for fragmentizer request: " + req);
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Moving file ranges for fragmentizer request [req=" + req + ", fileInfo=" + fileInfo + ']');
+
+        for (IgfsFileAffinityRange range : ranges) {
+            try {
+                IgfsFileInfo updated;
+
+                switch (range.status()) {
+                    case RANGE_STATUS_INITIAL: {
+                        // Mark range as moving.
+                        updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVING));
+
+                        if (updated == null) {
+                            igfsCtx.data().cleanBlocks(fileInfo, range, true);
+
+                            continue;
+                        }
+
+                        // Fall-through.
+                    }
+
+                    case RANGE_STATUS_MOVING: {
+                        // Move colocated blocks.
+                        igfsCtx.data().spreadBlocks(fileInfo, range);
+
+                        // Mark range as moved.
+                        updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVED));
+
+                        if (updated == null) {
+                            igfsCtx.data().cleanBlocks(fileInfo, range, true);
+
+                            continue;
+                        }
+
+                        // Fall-through.
+                    }
+
+                    case RANGE_STATUS_MOVED: {
+                        // Remove old blocks.
+                        igfsCtx.data().cleanBlocks(fileInfo, range, false);
+
+                        // Remove range from map.
+                        updated = igfsCtx.meta().updateInfo(fileId, deleteRange(range));
+
+                        if (updated == null)
+                            igfsCtx.data().cleanBlocks(fileInfo, range, true);
+                    }
+                }
+            }
+            catch (IgfsInvalidRangeException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to update file range " +
+                        "[range=" + range + "fileId=" + fileId + ", err=" + e.getMessage() + ']');
+            }
+        }
+    }
+
+    /**
+     * Creates update info closure that will mark given range as moving.
+     *
+     * @param range Range to mark as moving.
+     * @param status Status.
+     * @return Update closure.
+     */
+    private IgniteClosure<IgfsFileInfo, IgfsFileInfo> updateRange(final IgfsFileAffinityRange range,
+        final int status) {
+        return new CX1<IgfsFileInfo, IgfsFileInfo>() {
+            @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
+                IgfsFileMap map = new IgfsFileMap(info.fileMap());
+
+                map.updateRangeStatus(range, status);
+
+                if (log.isDebugEnabled())
+                    log.debug("Updated file map for range [fileId=" + info.id() + ", range=" + range +
+                        ", status=" + status + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+
+                IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+
+                updated.fileMap(map);
+
+                return updated;
+            }
+        };
+    }
+
+    /**
+     * Creates update info closure that will mark given range as moving.
+     *
+     * @param range Range to mark as moving.
+     * @return Update closure.
+     */
+    private IgniteClosure<IgfsFileInfo, IgfsFileInfo> deleteRange(final IgfsFileAffinityRange range) {
+        return new CX1<IgfsFileInfo, IgfsFileInfo>() {
+            @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
+                IgfsFileMap map = new IgfsFileMap(info.fileMap());
+
+                map.deleteRange(range);
+
+                if (log.isDebugEnabled())
+                    log.debug("Deleted range from file map [fileId=" + info.id() + ", range=" + range +
+                        ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+
+                IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+
+                updated.fileMap(map);
+
+                return updated;
+            }
+        };
+    }
+
+    /**
+     * Fragmentizer coordinator thread.
+     */
+    private class FragmentizerCoordinator extends GridWorker implements GridLocalEventListener, GridMessageListener {
+        /** Files being fragmented. */
+        private ConcurrentMap<IgniteUuid, Collection<UUID>> fragmentingFiles = new ConcurrentHashMap<>();
+
+        /** Node IDs captured on start. */
+        private volatile Collection<UUID> startSync;
+
+        /** Wait lock. */
+        private Lock lock = new ReentrantLock();
+
+        /** Wait condition. */
+        private Condition cond = lock.newCondition();
+
+        /**
+         * Constructor.
+         */
+        protected FragmentizerCoordinator() {
+            super(igfsCtx.kernalContext().gridName(), "fragmentizer-coordinator", igfsCtx.kernalContext().log());
+
+            igfsCtx.kernalContext().event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED);
+            igfsCtx.kernalContext().io().addMessageListener(topic, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            // Wait for all previous fragmentizer tasks to complete.
+            syncStart();
+
+            while (!isCancelled()) {
+                // If we have room for files, add them to fragmentizer.
+                try {
+                    while (fragmentingFiles.size() < igfsCtx.configuration().getFragmentizerConcurrentFiles()) {
+                        IgfsFileInfo fileInfo = fileForFragmentizer(fragmentingFiles.keySet());
+
+                        // If no colocated files found, exit loop.
+                        if (fileInfo == null)
+                            break;
+
+                        requestFragmenting(fileInfo);
+                    }
+                }
+                catch (IgniteCheckedException | IgniteException e) {
+                    if (!X.hasCause(e, InterruptedException.class) && !X.hasCause(e, IgniteInterruptedCheckedException.class))
+                        LT.warn(log, e, "Failed to get fragmentizer file info (will retry).");
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Got interrupted exception in fragmentizer coordinator (grid is stopping).");
+
+                        break; // While.
+                    }
+                }
+
+                lock.lock();
+
+                try {
+                    cond.await(FRAGMENTIZER_CHECK_INTERVAL, MILLISECONDS);
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+            if (log.isDebugEnabled())
+                log.debug("Processing node leave event: " + discoEvt);
+
+            boolean signal = false;
+
+            Collection<UUID> startSync0 = startSync;
+
+            if (startSync0 != null && !startSync0.isEmpty()) {
+                startSync0.remove(discoEvt.eventNode().id());
+
+                if (startSync0.isEmpty()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completed fragmentizer coordinator sync start.");
+
+                    signal = true;
+                }
+            }
+
+            if (!signal) {
+                Iterator<Map.Entry<IgniteUuid, Collection<UUID>>> it = fragmentingFiles.entrySet().iterator();
+
+                while (it.hasNext()) {
+                    Map.Entry<IgniteUuid, Collection<UUID>> entry = it.next();
+
+                    Collection<UUID> nodeIds = entry.getValue();
+
+                    if (nodeIds.remove(discoEvt.eventNode().id())) {
+                        if (nodeIds.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received all responses for fragmentizer task [fileId=" + entry.getKey() +
+                                    ']');
+
+                            it.remove();
+
+                            signal = true;
+                        }
+                    }
+                }
+            }
+
+            if (signal)
+                wakeUp();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            if (msg instanceof IgfsFragmentizerResponse) {
+                IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg;
+
+                IgniteUuid fileId = res.fileId();
+
+                Collection<UUID> nodeIds = fragmentingFiles.get(fileId);
+
+                if (nodeIds != null) {
+                    if (nodeIds.remove(nodeId)) {
+                        if (nodeIds.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received all responses for fragmentizer task [fileId=" + fileId + ']');
+
+                            fragmentingFiles.remove(fileId, nodeIds);
+
+                            wakeUp();
+                        }
+                    }
+                }
+                else
+                    log.warning("Received fragmentizer response for file ID which was not requested (will ignore) " +
+                        "[nodeId=" + nodeId + ", fileId=" + res.fileId() + ']');
+            }
+            else if (msg instanceof IgfsSyncMessage) {
+                IgfsSyncMessage sync = (IgfsSyncMessage)msg;
+
+                if (sync.response() && sync.order() == igfsCtx.kernalContext().grid().localNode().order()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received fragmentizer sync response from remote node: " + nodeId);
+
+                    Collection<UUID> startSync0 = startSync;
+
+                    if (startSync0 != null) {
+                        startSync0.remove(nodeId);
+
+                        if (startSync0.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Completed fragmentizer coordinator sync start: " + startSync0);
+
+                            wakeUp();
+                        }
+                    }
+                }
+            }
+        }
+
+        /**
+         * Signals condition.
+         */
+        private void wakeUp() {
+            lock.lock();
+
+            try {
+                cond.signalAll();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * Sends sync message to remote nodes and awaits for response from all nodes.
+         *
+         * @throws InterruptedException If waiting was interrupted.
+         */
+        private void syncStart() throws InterruptedException {
+            Collection<UUID> startSync0 = startSync = new GridConcurrentHashSet<>(
+                F.viewReadOnly(
+                    igfsCtx.kernalContext().discovery().allNodes(),
+                    F.node2id(),
+                    new P1<ClusterNode>() {
+                        @Override public boolean apply(ClusterNode n) {
+                            return igfsCtx.ggfsNode(n);
+                        }
+                    }));
+
+            ClusterNode locNode = igfsCtx.kernalContext().grid().localNode();
+
+            while (!startSync0.isEmpty()) {
+                for (UUID nodeId : startSync0) {
+                    IgfsSyncMessage syncReq = new IgfsSyncMessage(locNode.order(), false);
+
+                    try {
+                        if (log.isDebugEnabled())
+                            log.debug("Sending fragmentizer sync start request to remote node [nodeId=" + nodeId +
+                                ", syncReq=" + syncReq + ']');
+
+                        sendWithRetries(nodeId, syncReq);
+
+                        // Close window between message sending and discovery event.
+                        if (!igfsCtx.kernalContext().discovery().alive(nodeId))
+                            startSync0.remove(nodeId);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send sync message to remote node (node has left the grid): " +
+                                    nodeId);
+                        }
+                        else
+                            U.error(log, "Failed to send synchronize message to remote node (will not wait for reply): " +
+                                nodeId, e);
+
+                        startSync0.remove(nodeId);
+                    }
+                }
+
+                lock.lock();
+
+                try {
+                    if (!startSync0.isEmpty())
+                        cond.await(10000, MILLISECONDS);
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        /**
+         * Starts file fragmenting. Will group file affinity ranges by nodes and send requests to each node.
+         * File will be considered processed when each node replied with success (or error) or left the grid.
+         *
+         * @param fileInfo File info to process.
+         */
+        private void requestFragmenting(IgfsFileInfo fileInfo) {
+            IgfsFileMap map = fileInfo.fileMap();
+
+            assert map != null && !map.ranges().isEmpty();
+
+            Map<UUID, Collection<IgfsFileAffinityRange>> grpMap = U.newHashMap(map.ranges().size());
+
+            for (IgfsFileAffinityRange range : map.ranges()) {
+                UUID nodeId = igfsCtx.data().affinityNode(range.affinityKey()).id();
+
+                Collection<IgfsFileAffinityRange> nodeRanges = grpMap.get(nodeId);
+
+                if (nodeRanges == null) {
+                    nodeRanges = new LinkedList<>();
+
+                    grpMap.put(nodeId, nodeRanges);
+                }
+
+                nodeRanges.addAll(range.split(igfsCtx.data().groupBlockSize()));
+            }
+
+            Collection<UUID> nodeIds = new IdentityHashSet(grpMap.keySet());
+
+            if (log.isDebugEnabled())
+                log.debug("Calculating fragmentizer groups for file [fileInfo=" + fileInfo +
+                    ", nodeIds=" + nodeIds + ']');
+
+            // Put assignment to map first.
+            Object old = fragmentingFiles.putIfAbsent(fileInfo.id(), nodeIds);
+
+            assert old == null;
+
+            for (Map.Entry<UUID, Collection<IgfsFileAffinityRange>> entry : grpMap.entrySet()) {
+                UUID nodeId = entry.getKey();
+
+                IgfsFragmentizerRequest msg = new IgfsFragmentizerRequest(fileInfo.id(), entry.getValue());
+
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending fragmentizer request to remote node [nodeId=" + nodeId +
+                            ", fileId=" + fileInfo.id() + ", msg=" + msg + ']');
+
+                    sendWithRetries(nodeId, msg);
+                }
+                catch (IgniteCheckedException e) {
+                    if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send fragmentizer request to remote node (node left grid): " +
+                                nodeId);
+                    }
+                    else
+                        U.error(log, "Failed to send fragmentizer request to remote node [nodeId=" + nodeId +
+                            ", msg=" + msg + ']', e);
+
+                    nodeIds.remove(nodeId);
+                }
+            }
+
+            if (nodeIds.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Got empty wait set for fragmentized file: " + fileInfo);
+
+                fragmentingFiles.remove(fileInfo.id(), nodeIds);
+            }
+        }
+    }
+
+    /**
+     * Gets next file for fragmentizer to be processed.
+     *
+     * @param exclude File IDs to exclude (the ones that are currently being processed).
+     * @return File ID to process or {@code null} if there are no such files.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable private IgfsFileInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException {
+        return fragmentizerEnabled ? igfsCtx.meta().fileForFragmentizer(exclude) : null;
+    }
+
+    /**
+     * Fragmentizer worker thread.
+     */
+    private class FragmentizerWorker extends GridWorker implements GridMessageListener {
+        /** Requests for this worker. */
+        private BlockingQueue<IgniteBiTuple<UUID, IgfsCommunicationMessage>> msgs = new LinkedBlockingDeque<>();
+
+        /**
+         * Constructor.
+         */
+        protected FragmentizerWorker() {
+            super(igfsCtx.kernalContext().gridName(), "fragmentizer-worker", igfsCtx.kernalContext().log());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            if (msg instanceof IgfsFragmentizerRequest ||
+                msg instanceof IgfsSyncMessage) {
+                if (log.isDebugEnabled())
+                    log.debug("Received fragmentizer request from remote node [nodeId=" + nodeId +
+                        ", msg=" + msg + ']');
+
+                IgniteBiTuple<UUID, IgfsCommunicationMessage> tup = F.t(nodeId, (IgfsCommunicationMessage)msg);
+
+                try {
+                    if (!msgs.offer(tup, MSG_OFFER_TIMEOUT, TimeUnit.MILLISECONDS)) {
+                        U.error(log, "Failed to process fragmentizer communication message (will discard) " +
+                            "[nodeId=" + nodeId + ", msg=" + msg + ']');
+                    }
+                }
+                catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+
+                    U.warn(log, "Failed to process fragmentizer communication message (thread was interrupted) "+
+                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                IgniteBiTuple<UUID, IgfsCommunicationMessage> req = msgs.take();
+
+                UUID nodeId = req.get1();
+
+                if (req.get2() instanceof IgfsFragmentizerRequest) {
+                    IgfsFragmentizerRequest fragmentizerReq = (IgfsFragmentizerRequest)req.get2();
+
+                    if (!rw.tryReadLock()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received fragmentizing request while stopping grid (will ignore) " +
+                                "[nodeId=" + nodeId + ", req=" + req.get2() + ']');
+
+                        continue; // while.
+                    }
+
+                    try {
+                        try {
+                            processFragmentizerRequest(fragmentizerReq);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to process fragmentizer request (remote node left the grid) " +
+                                        "[req=" + req + ", err=" + e.getMessage() + ']');
+                            }
+                            else
+                                U.error(log, "Failed to process fragmentizer request [nodeId=" + nodeId +
+                                    ", req=" + req + ']', e);
+                        }
+                        finally {
+                            sendResponse(nodeId, new IgfsFragmentizerResponse(fragmentizerReq.fileId()));
+                        }
+                    }
+                    finally {
+                        rw.readUnlock();
+                    }
+                }
+                else {
+                    assert req.get2() instanceof IgfsSyncMessage;
+
+                    IgfsSyncMessage syncMsg = (IgfsSyncMessage)req.get2();
+
+                    if (!syncMsg.response()) {
+                        IgfsSyncMessage res = new IgfsSyncMessage(syncMsg.order(), true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Sending fragmentizer sync response to remote node [nodeId=" + nodeId +
+                                ", res=" + res + ']');
+
+                        sendResponse(nodeId, res);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Sends response to remote node.
+         *
+         * @param nodeId Node ID to send response to.
+         * @param msg Message to send.
+         */
+        private void sendResponse(UUID nodeId, IgfsCommunicationMessage msg) {
+            try {
+                sendWithRetries(nodeId, msg);
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(ClusterTopologyCheckedException.class)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send sync response to GGFS fragmentizer coordinator " +
+                            "(originating node left the grid): " + nodeId);
+                }
+                else
+                    U.error(log, "Failed to send sync response to GGFS fragmentizer coordinator: " + nodeId, e);
+            }
+        }
+    }
+
+    /**
+     * Hash set that overrides equals to use identity comparison.
+     */
+    private static class IdentityHashSet extends GridConcurrentHashSet<UUID> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Constructor.
+         *
+         * @param c Collection to add.
+         */
+        private IdentityHashSet(Collection<UUID> c) {
+            super(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            // Identity comparison.
+            return this == o;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
new file mode 100644
index 0000000..6c7f93b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Fragmentizer request. Sent from coordinator to other GGFS nodes when colocated part of file
+ * should be fragmented.
+ */
+public class IgfsFragmentizerRequest extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** Ranges to fragment. */
+    @GridToStringInclude
+    @GridDirectCollection(IgfsFileAffinityRange.class)
+    private Collection<IgfsFileAffinityRange> fragmentRanges;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsFragmentizerRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param fileId File id to fragment.
+     * @param fragmentRanges Ranges to fragment.
+     */
+    public IgfsFragmentizerRequest(IgniteUuid fileId, Collection<IgfsFileAffinityRange> fragmentRanges) {
+        this.fileId = fileId;
+        this.fragmentRanges = fragmentRanges;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Fragment ranges.
+     */
+    public Collection<IgfsFileAffinityRange> fragmentRanges() {
+        return fragmentRanges;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsFragmentizerRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsFragmentizerRequest _clone = new IgfsFragmentizerRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsFragmentizerRequest _clone = (IgfsFragmentizerRequest)_msg;
+
+        _clone.fileId = fileId;
+        _clone.fragmentRanges = fragmentRanges;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeCollection("fragmentRanges", fragmentRanges, IgfsFileAffinityRange.class))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                fragmentRanges = reader.readCollection("fragmentRanges", IgfsFileAffinityRange.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 69;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
new file mode 100644
index 0000000..d9dd02b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Fragmentizer response.
+ */
+public class IgfsFragmentizerResponse extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File ID. */
+    private IgniteUuid fileId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgfsFragmentizerResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param fileId File ID.
+     */
+    public IgfsFragmentizerResponse(IgniteUuid fileId) {
+        this.fileId = fileId;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsFragmentizerResponse _clone = new IgfsFragmentizerResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsFragmentizerResponse _clone = (IgfsFragmentizerResponse)_msg;
+
+        _clone.fileId = fileId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeIgniteUuid("fileId", fileId))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                fileId = reader.readIgniteUuid("fileId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 70;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
new file mode 100644
index 0000000..890cd6b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHandshakeResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Handshake message.
+ */
+public class IgfsHandshakeResponse implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** SECONDARY paths. */
+    private IgfsPaths paths;
+
+    /** Server block size. */
+    private long blockSize;
+
+    /** Whether to force sampling on client's side. */
+    private Boolean sampling;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public IgfsHandshakeResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param paths Secondary paths.
+     * @param blockSize Server default block size.
+     */
+    public IgfsHandshakeResponse(String ggfsName, IgfsPaths paths, long blockSize, Boolean sampling) {
+        assert paths != null;
+
+        this.ggfsName = ggfsName;
+        this.paths = paths;
+        this.blockSize = blockSize;
+        this.sampling = sampling;
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String ggfsName() {
+        return ggfsName;
+    }
+
+    /**
+     * @return SECONDARY paths configured on server.
+     */
+    public IgfsPaths secondaryPaths() {
+        return paths;
+    }
+
+    /**
+     * @return Server default block size.
+     */
+    public long blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Sampling flag.
+     */
+    public Boolean sampling() {
+        return sampling;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ggfsName);
+
+        paths.writeExternal(out);
+
+        out.writeLong(blockSize);
+
+        if (sampling != null) {
+            out.writeBoolean(true);
+            out.writeBoolean(sampling);
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ggfsName = U.readString(in);
+
+        paths = new IgfsPaths();
+
+        paths.readExternal(in);
+
+        blockSize = in.readLong();
+
+        if (in.readBoolean())
+            sampling = in.readBoolean();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelper.java
new file mode 100644
index 0000000..d9be110
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * GGFS utility processor adapter.
+ */
+public interface IgfsHelper {
+    /**
+     * Pre-process cache configuration.
+     *
+     * @param cfg Cache configuration.
+     */
+    public abstract void preProcessCacheConfiguration(CacheConfiguration cfg);
+
+    /**
+     * Validate cache configuration for GGFS.
+     *
+     * @param cfg Cache configuration.
+     * @throws IgniteCheckedException If validation failed.
+     */
+    public abstract void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException;
+
+    /**
+     * Check whether object is of type {@code GridGgfsBlockKey}
+     *
+     * @param key Key.
+     * @return {@code True} if GGFS block key.
+     */
+    public abstract boolean isGgfsBlockKey(Object key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelperImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelperImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelperImpl.java
new file mode 100644
index 0000000..532dca5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsHelperImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.eviction.*;
+import org.apache.ignite.cache.eviction.ignitefs.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * GGFS utils processor.
+ */
+public class IgfsHelperImpl implements IgfsHelper {
+    /** {@inheritDoc} */
+    @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) {
+        CacheEvictionPolicy evictPlc = cfg.getEvictionPolicy();
+
+        if (evictPlc instanceof CacheIgfsPerBlockLruEvictionPolicy && cfg.getEvictionFilter() == null)
+            cfg.setEvictionFilter(new CacheIgfsEvictionFilter());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException {
+        CacheEvictionPolicy evictPlc =  cfg.getEvictionPolicy();
+
+        if (evictPlc != null && evictPlc instanceof CacheIgfsPerBlockLruEvictionPolicy) {
+            CacheEvictionFilter evictFilter = cfg.getEvictionFilter();
+
+            if (evictFilter != null && !(evictFilter instanceof CacheIgfsEvictionFilter))
+                throw new IgniteCheckedException("Eviction filter cannot be set explicitly when using " +
+                    "CacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isGgfsBlockKey(Object key) {
+        return key instanceof IgfsBlockKey;
+    }
+}


Mime
View raw message