ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [5/5] ignite git commit: IGNITE-3414: WIP on reducers.
Date Tue, 05 Jul 2016 09:31:24 GMT
IGNITE-3414: WIP on reducers.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c683e510
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c683e510
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c683e510

Branch: refs/heads/ignite-3414
Commit: c683e5100525a1b09fbca3d9f6c83f9f8a71f2ae
Parents: 5e6a711
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 5 12:31:13 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 5 12:31:13 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 87 ++++++++++++++++----
 1 file changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c683e510/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 87b4161..fa896d9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -96,11 +97,11 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
 
         HadoopMapReducePlanTopology top = topology(nodes);
 
-        Map<UUID, Collection<HadoopInputSplit>> mappers = assignMappers(splits,
top);
+        Mappers mappers = assignMappers(splits, top);
 
         Map<UUID, int[]> reducers = assignReducers(top, mappers, reducerCnt);
 
-        return new HadoopDefaultMapReducePlan(mappers, reducers);
+        return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
     }
 
     /**
@@ -111,9 +112,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
      * @return Mappers.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<UUID, Collection<HadoopInputSplit>> assignMappers(Collection<HadoopInputSplit>
splits,
+    private Mappers assignMappers(Collection<HadoopInputSplit> splits,
         HadoopMapReducePlanTopology top) throws IgniteCheckedException {
-        Map<UUID, Collection<HadoopInputSplit>> res = new HashMap<>();
+        Mappers res = new Mappers();
 
         for (HadoopInputSplit split : splits) {
             // Try getting IGFS affinity.
@@ -122,16 +123,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
             // Get best node.
             UUID node = bestMapperNode(nodeIds, top);
 
-            // Add to result.
-            Collection<HadoopInputSplit> nodeSplits = res.get(node);
-
-            if (nodeSplits == null) {
-                nodeSplits = new HashSet<>();
-
-                res.put(node, nodeSplits);
-            }
-
-            nodeSplits.add(split);
+            res.add(split, node);
         }
 
         return res;
@@ -341,11 +333,43 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
      * @return Reducers.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<UUID, int[]> assignReducers(HadoopMapReducePlanTopology top, Map<UUID,
Collection<HadoopInputSplit>> mappers,
-        int reducerCnt) throws IgniteCheckedException {
+    private Map<UUID, int[]> assignReducers(HadoopMapReducePlanTopology top, Mappers
mappers, int reducerCnt)
+        throws IgniteCheckedException {
+        Map<UUID, Integer> reducers = assignReducers0(top, mappers, reducerCnt);
+
+        int cnt = 0;
+
+        Map<UUID, int[]> res = new HashMap<>(reducers.size());
+
+        for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
+            int[] arr = new int[reducerEntry.getValue()];
+
+            for (int i = 0; i < arr.length; i++)
+                arr[i] = cnt++;
+
+            res.put(reducerEntry.getKey(), arr);
+        }
+
+        assert reducerCnt == cnt;
+
+        return res;
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Mappers
mappers, int reducerCnt)
+        throws IgniteCheckedException {
 
 
         // TODO
+
         return null;
     }
 
@@ -526,6 +550,37 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
     }
 
     /**
+     * Mappers.
+     */
+    private static class Mappers {
+        /** Node-to-splits map. */
+        private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits =
new HashMap<>();
+
+        /** Split-to-node map. */
+        private final Map<HadoopInputSplit, UUID> splitToNode = new IdentityHashMap<>();
+
+        /**
+         * Add mapping.
+         *
+         * @param split Split.
+         * @param node Node.
+         */
+        public void add(HadoopInputSplit split, UUID node) {
+            Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
+
+            if (nodeSplits == null) {
+                nodeSplits = new HashSet<>();
+
+                nodeToSplits.put(node, nodeSplits);
+            }
+
+            nodeSplits.add(split);
+
+            splitToNode.put(split, node);
+        }
+    }
+
+    /**
      * Mapper priority enumeration.
      */
     private enum MapperPriority {


Mime
View raw message