ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/5] ignite git commit: IGNITE-3414: Implemented mapper.
Date Tue, 05 Jul 2016 09:31:21 GMT
IGNITE-3414: Implemented mapper.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0be3f6bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0be3f6bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0be3f6bf

Branch: refs/heads/ignite-3414
Commit: 0be3f6bf98301f90c1e44b78e34b07f6d7c485fc
Parents: 6da0ca5
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 5 00:03:04 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 5 00:03:04 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopAffinityMapReducePlanner.java   | 470 +++++++++++++++++++
 .../IgniteHadoopMapReducePlanner2.java          | 426 -----------------
 .../planner/HadoopAbstractMapReducePlanner.java |   4 +-
 .../planner/HadoopMapReducePlanGroup.java       |  57 ++-
 .../planner/HadoopMapReducePlanTopology.java    |  38 +-
 5 files changed, 563 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0be3f6bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopAffinityMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopAffinityMapReducePlanner.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopAffinityMapReducePlanner.java
new file mode 100644
index 0000000..4583e25
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopAffinityMapReducePlanner.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
+
+/**
+ * Map-reduce planner which tries to assign map jobs to affinity nodes.
+ */
+public class IgniteHadoopAffinityMapReducePlanner extends HadoopAbstractMapReducePlanner
{
+    /** Defautl value of affinity node preference factor. */
+    private static final float DFLT_AFF_NODE_PREFERENCE_FACTOR = 2.0f;
+
+    /** Affinity node preference factor. */
+    private float affNodePreferenceFactor = DFLT_AFF_NODE_PREFERENCE_FACTOR;
+
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode>
nodes,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        Collection<HadoopInputSplit> inputSplits = job.input();
+        int reducerCnt = job.info().reducers();
+
+        if (reducerCnt < 0)
+            throw new IgniteCheckedException("Number of reducers must be non-negative, actual:
" + reducerCnt);
+
+        HadoopMapReducePlanTopology top = topology(nodes);
+
+        Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(inputSplits,
top);
+
+        Map<UUID, int[]> reducers = reducers(top, mappers, reducerCnt);
+
+        return new HadoopDefaultMapReducePlan(mappers, reducers);
+    }
+
+    /**
+     * Generate mappers.
+     *
+     * @param inputSplits Input splits.
+     * @param top Topology.
+     * @return Mappers.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<HadoopInputSplit>
inputSplits,
+        HadoopMapReducePlanTopology top) throws IgniteCheckedException {
+        Map<UUID, Collection<HadoopInputSplit>> res = new HashMap<>();
+
+        // Sort input splits by length, the longest goes first. This way we ensure that the
longest splits
+        // are processed first and assigned in the most efficient way.
+        for (HadoopInputSplit inputSplit : sortInputSplits(inputSplits)) {
+            // Try getting IGFS affinity.
+            Collection<UUID> nodeIds = igfsAffinity(inputSplit);
+
+            if (nodeIds != null)
+                nodeIds = affinity(inputSplit, top);
+
+            // Get best node.
+            UUID node = bestMapperNode(nodeIds, top);
+
+            // Add to result.
+            Collection<HadoopInputSplit> nodeSplits = res.get(node);
+
+            if (nodeSplits == null) {
+                nodeSplits = new HashSet<>();
+
+                res.put(node, nodeSplits);
+            }
+
+            nodeSplits.add(inputSplit);
+        }
+
+        return res;
+    }
+
+    /**
+     * Sort input splits by length. The longest split goes first.
+     *
+     * @param inputSplits Original input splits.
+     * @return Sorted input splits.
+     */
+    private List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit>
inputSplits) {
+        int id = 0;
+
+        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+        for (HadoopInputSplit inputSplit : inputSplits) {
+            long len = inputSplit instanceof HadoopFileBlock ? ((HadoopFileBlock)inputSplit).length()
: 0;
+
+            sortedSplits.add(new SplitSortWrapper(id++, inputSplit, len));
+        }
+
+        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+        for (SplitSortWrapper sortedSplit : sortedSplits)
+            res.add(sortedSplit.split);
+
+        return res;
+
+    }
+
+    /**
+     * Get IGFS affinity.
+     *
+     * @param split Input split.
+     * @return IGFS affinity or {@code null} if IGFS is not available.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private Collection<UUID> igfsAffinity(HadoopInputSplit split) throws
IgniteCheckedException {
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+            if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+                HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
+
+                IgfsEx igfs = null;
+
+                if (F.eq(ignite.name(), endpoint.grid()))
+                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+                if (igfs != null && !igfs.isProxy(split0.file())) {
+                    IgfsPath path = new IgfsPath(split0.file());
+
+                    if (igfs.exists(path)) {
+                        Collection<IgfsBlockLocation> blocks;
+
+                        try {
+                            blocks = igfs.affinity(path, split0.start(), split0.length());
+                        }
+                        catch (IgniteException e) {
+                            throw new IgniteCheckedException("Failed to get IGFS file block
affinity [path=" + path +
+                                ", start=" + split0.start() + ", len=" + split0.length()
+ ']', e);
+                        }
+
+                        assert blocks != null;
+
+                        if (blocks.size() == 1)
+                            return blocks.iterator().next().nodeIds();
+                        else {
+                            // The most "local" nodes go first.
+                            Map<UUID, Long> idToLen = new HashMap<>();
+
+                            for (IgfsBlockLocation block : blocks) {
+                                for (UUID id : block.nodeIds()) {
+                                    Long len = idToLen.get(id);
+
+                                    idToLen.put(id, len == null ? block.length() : block.length()
+ len);
+                                }
+                            }
+
+                            Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+                            for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet())
{
+                                UUID id = idToLenEntry.getKey();
+
+                                res.put(new NodeIdAndLength(id, idToLenEntry.getValue()),
id);
+                            }
+
+                            return new HashSet<>(res.values());
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Get affinity.
+     *
+     * @param split Input split.
+     * @param top Topology.
+     * @return Affinity.
+     */
+    private Collection<UUID> affinity(HadoopInputSplit split, HadoopMapReducePlanTopology
top) {
+        Collection<UUID> res = new HashSet<>();
+
+        for (String host : split.hosts()) {
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
+
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++)
+                    res.add(grp.node(i).id());
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Get best mapper node.
+     *
+     * @param affIds Affintiy node IDs.
+     * @param top Topology.
+     * @return Result.
+     */
+    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology
top) {
+        int affWeight = 100;
+        int nonAffWeight = Math.round(affWeight * affNodePreferenceFactor);
+
+        // Priority node.
+        UUID priorityAffId = F.isEmpty(affIds) ? null : affIds.iterator().next();
+
+        // Find group with the least weight.
+        HadoopMapReducePlanGroup leastGrp = null;
+        int leastPriority = 0;
+        int leastWeight = Integer.MAX_VALUE;
+
+        for (HadoopMapReducePlanGroup grp : top.groups()) {
+            int priority = groupPriority(grp, affIds, priorityAffId);
+            int weight = grp.mappersWeight() + (leastPriority == 0 ? nonAffWeight : affWeight);
+
+            if (leastGrp == null) {
+                leastGrp = grp;
+                leastPriority = priority;
+                leastWeight = weight;
+            }
+            else if (weight < leastWeight || weight == leastWeight && priority
> leastPriority) {
+                leastGrp = grp;
+                leastPriority = priority;
+                leastWeight = weight;
+            }
+        }
+
+        assert leastGrp != null;
+
+        // Update group weight for further runs.
+        leastGrp.mappersWeight(leastWeight);
+
+        // Return the best node from the group.
+        int idx = 0;
+
+        // This is rare situation when several nodes are started on the same host.
+        if (!leastGrp.single()) {
+            if (leastPriority == 0)
+                // Pick any node.
+                idx = ThreadLocalRandom.current().nextInt(leastGrp.nodeCount());
+            else if (leastPriority == 1) {
+                // Pick any affinity node.
+                assert affIds != null;
+
+                List<Integer> cands = new ArrayList<>();
+
+                for (int i = 0; i < leastGrp.nodeCount(); i++) {
+                    UUID id = leastGrp.node(i).id();
+
+                    if (affIds.contains(id))
+                        cands.add(i);
+                }
+
+                idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+            }
+            else {
+                // Find primary node.
+                assert priorityAffId != null;
+
+                for (int i = 0; i < leastGrp.nodeCount(); i++) {
+                    UUID id = leastGrp.node(i).id();
+
+                    if (F.eq(id, priorityAffId)) {
+                        idx = i;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return leastGrp.node(idx).id();
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     * @throws IgniteCheckedException If fialed.
+     */
+    private Map<UUID, int[]> reducers(HadoopMapReducePlanTopology top, Map<UUID,
Collection<HadoopInputSplit>> mappers,
+        int reducerCnt) throws IgniteCheckedException {
+        // TODO
+        return null;
+    }
+
+    /**
+     * Calculate group priority.
+     *
+     * @param grp Group.
+     * @param affIds Affintiy IDs.
+     * @param priorityAffId Priority affinity ID.
+     * @return Group priority.
+     */
+    private static int groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID>
affIds,
+        @Nullable UUID priorityAffId) {
+        if (F.isEmpty(affIds)) {
+            assert priorityAffId == null;
+
+            return 0;
+        }
+
+        int priority = 0;
+
+        for (int i = 0; i < grp.nodeCount(); i++) {
+            UUID id = grp.node(i).id();
+
+            if (affIds.contains(id)) {
+                priority = 1;
+
+                if (F.eq(priorityAffId, id)) {
+                    priority = 2;
+
+                    break;
+                }
+            }
+        }
+
+        return priority;
+    }
+
+    public float getAffinityNodePreferenceFactor() {
+        return affNodePreferenceFactor;
+    }
+
+    public void setAffinityNodePreferenceFactor(float affNodePreferenceFactor) {
+        this.affNodePreferenceFactor = affNodePreferenceFactor;
+    }
+
+    /**
+     * Node ID and length.
+     */
+    private static class NodeIdAndLength implements Comparable<NodeIdAndLength> {
+        /** Node ID. */
+        private final UUID id;
+
+        /** Length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param len Length.
+         */
+        public NodeIdAndLength(UUID id, long len) {
+            this.id = id;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(NodeIdAndLength obj) {
+            long res = len - obj.len;
+
+            if (res > 0)
+                return 1;
+            else if (res < 0)
+                return -1;
+            else
+                return id.compareTo(obj.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof NodeIdAndLength && F.eq(id, ((NodeIdAndLength)obj).id);
+        }
+    }
+
+    /**
+     * Split wrapper for sorting.
+     */
+    private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+        /** Unique ID. */
+        private final int id;
+
+        /** Split. */
+        private final HadoopInputSplit split;
+
+        /** Split length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Unique ID.
+         * @param split Split.
+         * @param len Split length.
+         */
+        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+            this.id = id;
+            this.split = split;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(SplitSortWrapper other) {
+            assert other != null;
+
+            long res = len - other.len;
+
+            if (res > 0)
+                return 1;
+            else if (res < 0)
+                return -1;
+            else
+                return id - other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0be3f6bf/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner2.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner2.java
deleted file mode 100644
index ddd84de..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner2.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
-
-/**
- * Default map-reduce planner implementation.
- */
-public class IgniteHadoopMapReducePlanner2 extends HadoopAbstractMapReducePlanner {
-    /** {@inheritDoc} */
-    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode>
nodes,
-        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        Collection<HadoopInputSplit> inputSplits = job.input();
-        int reducerCnt = job.info().reducers();
-
-        if (reducerCnt < 0)
-            throw new IgniteCheckedException("Number of reducers must be non-negative, actual:
" + reducerCnt);
-
-        HadoopMapReducePlanTopology top = topology(nodes);
-
-
-        // Convert collection of topology nodes to collection of topology node IDs.
-        Collection<UUID> topIds = new HashSet<>(nodes.size(), 1.0f);
-
-        for (ClusterNode topNode : nodes)
-            topIds.add(topNode.id());
-
-        Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(nodes, topIds,
job.input());
-
-        int rdcCnt = job.info().reducers();
-
-        if (rdcCnt < 0)
-            throw new IgniteCheckedException("Number of reducers must be non-negative, actual:
" + rdcCnt);
-
-        Map<UUID, int[]> reducers = reducers(nodes, mappers, rdcCnt);
-
-        return new HadoopDefaultMapReducePlan(mappers, reducers);
-    }
-
-    /**
-     * Create plan for mappers.
-     *
-     * @param top Topology nodes.
-     * @param topIds Topology node IDs.
-     * @param splits Splits.
-     * @return Mappers map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode>
top, Collection<UUID> topIds,
-        Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
-        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
-
-        Map<String, Collection<UUID>> nodes = groupByHost(top);
-
-        Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track
node load.
-
-        for (UUID nodeId : topIds)
-            nodeLoads.put(nodeId, 0);
-
-        for (HadoopInputSplit split : splits) {
-            UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
-
-            if (log.isDebugEnabled())
-                log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId +
']');
-
-            Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId);
-
-            if (nodeSplits == null) {
-                nodeSplits = new ArrayList<>();
-
-                mappers.put(nodeId, nodeSplits);
-            }
-
-            nodeSplits.add(split);
-
-            // Updated node load.
-            nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
-        }
-
-        return mappers;
-    }
-
-    /**
-     * Determine the best node for this split.
-     *
-     * @param split Split.
-     * @param topIds Topology node IDs.
-     * @param nodes Nodes.
-     * @param nodeLoads Node load tracker.
-     * @return Node ID.
-     */
-    @SuppressWarnings("unchecked")
-    private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String,
Collection<UUID>> nodes,
-        Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock split0 = (HadoopFileBlock)split;
-
-            if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
-                HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
-
-                IgfsEx igfs = null;
-
-                if (F.eq(ignite.name(), endpoint.grid()))
-                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
-                if (igfs != null && !igfs.isProxy(split0.file())) {
-                    IgfsPath path = new IgfsPath(split0.file());
-
-                    if (igfs.exists(path)) {
-                        Collection<IgfsBlockLocation> blocks;
-
-                        try {
-                            blocks = igfs.affinity(path, split0.start(), split0.length());
-                        }
-                        catch (IgniteException e) {
-                            throw new IgniteCheckedException(e);
-                        }
-
-                        assert blocks != null;
-
-                        if (blocks.size() == 1)
-                            // Fast-path, split consists of one IGFS block (as in most cases).
-                            return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads,
false);
-                        else {
-                            // Slow-path, file consists of multiple IGFS blocks. First, find
the most co-located nodes.
-                            Map<UUID, Long> nodeMap = new HashMap<>();
-
-                            List<UUID> bestNodeIds = null;
-                            long bestLen = -1L;
-
-                            for (IgfsBlockLocation block : blocks) {
-                                for (UUID blockNodeId : block.nodeIds()) {
-                                    if (topIds.contains(blockNodeId)) {
-                                        Long oldLen = nodeMap.get(blockNodeId);
-                                        long newLen = oldLen == null ? block.length() : oldLen
+ block.length();
-
-                                        nodeMap.put(blockNodeId, newLen);
-
-                                        if (bestNodeIds == null || bestLen < newLen) {
-                                            bestNodeIds = new ArrayList<>(1);
-
-                                            bestNodeIds.add(blockNodeId);
-
-                                            bestLen = newLen;
-                                        }
-                                        else if (bestLen == newLen) {
-                                            assert !F.isEmpty(bestNodeIds);
-
-                                            bestNodeIds.add(blockNodeId);
-                                        }
-                                    }
-                                }
-                            }
-
-                            if (bestNodeIds != null) {
-                                return bestNodeIds.size() == 1 ? bestNodeIds.get(0) :
-                                    bestNode(bestNodeIds, topIds, nodeLoads, true);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        // Cannot use local IGFS for some reason, try selecting the node by host.
-        Collection<UUID> blockNodes = null;
-
-        for (String host : split.hosts()) {
-            Collection<UUID> hostNodes = nodes.get(host);
-
-            if (!F.isEmpty(hostNodes)) {
-                if (blockNodes == null)
-                    blockNodes = new ArrayList<>(hostNodes);
-                else
-                    blockNodes.addAll(hostNodes);
-            }
-        }
-
-        return bestNode(blockNodes, topIds, nodeLoads, false);
-    }
-
-    /**
-     * Finds the best (the least loaded) node among the candidates.
-     *
-     * @param candidates Candidates.
-     * @param topIds Topology node IDs.
-     * @param nodeLoads Known node loads.
-     * @param skipTopCheck Whether to skip topology check.
-     * @return The best node.
-     */
-    private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID>
topIds, Map<UUID, Integer> nodeLoads,
-        boolean skipTopCheck) {
-        UUID bestNode = null;
-        int bestLoad = Integer.MAX_VALUE;
-
-        if (candidates != null) {
-            for (UUID candidate : candidates) {
-                if (skipTopCheck || topIds.contains(candidate)) {
-                    int load = nodeLoads.get(candidate);
-
-                    if (bestNode == null || bestLoad > load) {
-                        bestNode = candidate;
-                        bestLoad = load;
-
-                        if (bestLoad == 0)
-                            break; // Minimum load possible, no need for further iterations.
-                    }
-                }
-            }
-        }
-
-        if (bestNode == null) {
-            // Blocks are located on nodes which are not Hadoop-enabled, assign to the least
loaded one.
-            bestLoad = Integer.MAX_VALUE;
-
-            for (UUID nodeId : topIds) {
-                int load = nodeLoads.get(nodeId);
-
-                if (bestNode == null || bestLoad > load) {
-                    bestNode = nodeId;
-                    bestLoad = load;
-
-                    if (bestLoad == 0)
-                        break; // Minimum load possible, no need for further iterations.
-                }
-            }
-        }
-
-        assert bestNode != null;
-
-        return bestNode;
-    }
-
-    /**
-     * Create plan for reducers.
-     *
-     * @param top Topology.
-     * @param mappers Mappers map.
-     * @param reducerCnt Reducers count.
-     * @return Reducers map.
-     */
-    private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
-        Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) {
-        // Determine initial node weights.
-        int totalWeight = 0;
-
-        List<WeightedNode> nodes = new ArrayList<>(top.size());
-
-        for (ClusterNode node : top) {
-            Collection<HadoopInputSplit> split = mappers.get(node.id());
-
-            int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
-
-            nodes.add(new WeightedNode(node.id(), weight, weight));
-
-            totalWeight += weight;
-        }
-
-        // Adjust weights.
-        int totalAdjustedWeight = 0;
-
-        for (WeightedNode node : nodes) {
-            node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
-
-            node.weight = Math.round(node.floatWeight);
-
-            totalAdjustedWeight += node.weight;
-        }
-
-        // Apply redundant/lost reducers.
-        Collections.sort(nodes);
-
-        if (totalAdjustedWeight > reducerCnt) {
-            // Too much reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasPrevious())
-                    iter = nodes.listIterator(nodes.size() - 1);
-
-                WeightedNode node = iter.previous();
-
-                if (node.weight > 0) {
-                    node.weight -= 1;
-
-                    totalAdjustedWeight--;
-                }
-            }
-        }
-        else if (totalAdjustedWeight < reducerCnt) {
-            // Not enough reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(0);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasNext())
-                    iter = nodes.listIterator(0);
-
-                WeightedNode node = iter.next();
-
-                if (node.floatWeight > 0.0f) {
-                    node.weight += 1;
-
-                    totalAdjustedWeight++;
-                }
-            }
-        }
-
-        int idx = 0;
-
-        Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
-
-        for (WeightedNode node : nodes) {
-            if (node.weight > 0) {
-                int[] arr = new int[node.weight];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = idx++;
-
-                reducers.put(node.nodeId, arr);
-            }
-        }
-
-        return reducers;
-    }
-
-    /**
-     * Calculate node weight based on node metrics and data co-location.
-     *
-     * @param node Node.
-     * @param splitCnt Splits mapped to this node.
-     * @return Node weight.
-     */
-    @SuppressWarnings("UnusedParameters")
-    protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
-        return splitCnt;
-    }
-
-    /**
-     * Weighted node.
-     */
-    private static class WeightedNode implements Comparable<WeightedNode> {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Weight. */
-        private int weight;
-
-        /** Floating point weight. */
-        private float floatWeight;
-
-        /**
-         * Constructor.
-         *
-         * @param nodeId Node ID.
-         * @param weight Weight.
-         * @param floatWeight Floating point weight.
-         */
-        private WeightedNode(UUID nodeId, int weight, float floatWeight) {
-            this.nodeId = nodeId;
-            this.weight = weight;
-            this.floatWeight = floatWeight;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj != null && obj instanceof WeightedNode && F.eq(nodeId,
((WeightedNode)obj).nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return nodeId.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareTo(@NotNull WeightedNode other) {
-            float res = other.floatWeight - floatWeight;
-
-            return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0be3f6bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
index cf15256..7cb54ae 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -66,7 +66,7 @@ public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReduceP
             HadoopMapReducePlanGroup grp = macsMap.get(macs);
 
             if (grp == null) {
-                grp = new HadoopMapReducePlanGroup(node);
+                grp = new HadoopMapReducePlanGroup(node, macs);
 
                 macsMap.put(macs, grp);
             }
@@ -85,7 +85,7 @@ public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReduceP
             }
         }
 
-        return new HadoopMapReducePlanTopology(idToGrp, hostToGrp);
+        return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp,
hostToGrp);
     }
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0be3f6bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
index fc0ffa9..0fc9373 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -1,6 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.hadoop.planner;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import java.util.ArrayList;
@@ -15,16 +33,24 @@ public class HadoopMapReducePlanGroup {
     /** Nodes. */
     private ArrayList<ClusterNode> nodes;
 
+    /** MAC addresses. */
+    private final String macs;
+
     /** CPUs. */
     private final int cpus;
 
+    /** Mappers weight. */
+    private int mappersWeight;
+
     /**
      * Constructor.
      *
      * @param node First node in the group.
+     * @param macs MAC addresses.
      */
-    public HadoopMapReducePlanGroup(ClusterNode node) {
+    public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
         this.node = node;
+        this.macs = macs;
 
         cpus = node.metrics().getTotalCpus();
     }
@@ -89,6 +115,35 @@ public class HadoopMapReducePlanGroup {
         return cpus;
     }
 
+    /**
+     * Get mappers weight.
+     *
+     * @return Mappers weight.
+     */
+    public int mappersWeight() {
+        return mappersWeight;
+    }
+
+    /**
+     * Set mappers weight.
+     *
+     * @param mappersWeight Mappers weight.
+     */
+    public void mappersWeight(int mappersWeight) {
+        this.mappersWeight = mappersWeight;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return macs.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(HadoopMapReducePlanGroup.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0be3f6bf/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
index c8e4587..fa5c469 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
@@ -1,7 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.hadoop.planner;
 
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -9,6 +28,9 @@ import java.util.UUID;
  * Map-reduce plan topology.
  */
 public class HadoopMapReducePlanTopology {
+    /** All groups. */
+    private final List<HadoopMapReducePlanGroup> grps;
+
     /** Node ID to group map. */
     private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
 
@@ -18,19 +40,29 @@ public class HadoopMapReducePlanTopology {
     /**
      * Constructor.
      *
+     * @param grps All groups.
      * @param idToGrp ID to group map.
      * @param hostToGrp Host to group map.
      */
-    public HadoopMapReducePlanTopology(Map<UUID, HadoopMapReducePlanGroup> idToGrp,
-        Map<String, HadoopMapReducePlanGroup> hostToGrp) {
+    public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup>
hostToGrp) {
+        assert grps != null;
         assert idToGrp != null;
         assert hostToGrp != null;
 
+        this.grps = grps;
         this.idToGrp = idToGrp;
         this.hostToGrp = hostToGrp;
     }
 
     /**
+     * @return All groups.
+     */
+    public List<HadoopMapReducePlanGroup> groups() {
+        return grps;
+    }
+
+    /**
      * Get group for node ID.
      *
      * @param id Node ID.
@@ -46,7 +78,7 @@ public class HadoopMapReducePlanTopology {
      * @param host Host.
      * @return Group.
      */
-    public HadoopMapReducePlanGroup groupForHost(String host) {
+    @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
         return hostToGrp.get(host);
     }
 


Mime
View raw message