ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [06/16] ignite git commit: FInalization.
Date Tue, 19 Jul 2016 09:43:52 GMT
FInalization.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc58697b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc58697b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc58697b

Branch: refs/heads/ignite-3414
Commit: fc58697b5d28d9167b9069890bf5e36e246e6f9b
Parents: 65d76a4
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 19 10:04:30 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 19 10:04:30 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 39 ++++++++++----------
 1 file changed, 20 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc58697b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 8f2a86c..c184e46 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -119,11 +119,10 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
         Mappers res = new Mappers();
 
         for (HadoopInputSplit split : splits) {
-            // Try getting IGFS affinity. Note that an empty collection may be returned
-            // if this is an HDFS split that does not belong to any Ignite node host.
+            // Try getting affinity node IDs.
             Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
 
-            // Get best node for this split:
+            // Get best node.
             UUID node = bestMapperNode(nodeIds, top);
 
             assert node != null;
@@ -136,8 +135,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
 
     /**
      * Get affinity nodes for the given input split.
-     *
-     * <p/>In general, order in the returned collection *is* significant, meaning that
nodes containing more data
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes containing more
data
      * go first. This way, the 1st nodes in the collection considered to be preferable for
scheduling.
      *
      * @param split Split.
@@ -147,31 +146,34 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
      */
     private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology
top)
         throws IgniteCheckedException {
-        // IGFS part:
         Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split);
 
-        // HDFS part:
-        if (nodeIds == null) {
-            nodeIds = new HashSet<>();
+        if (nodeIds != null)
+            return nodeIds;
+
+        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+        for (String host : split.hosts()) {
+            long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length()
: 0L;
 
-            // TODO: also sort hosts there in non-ascending data order, as we do in case
of IGFS affinity splits.
-            for (String host : split.hosts()) {
-                HadoopMapReducePlanGroup grp = top.groupForHost(host);
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
 
-                if (grp != null) {
-                    for (int i = 0; i < grp.nodeCount(); i++)
-                        nodeIds.add(grp.nodeId(i));
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++) {
+                    UUID nodeId = grp.nodeId(i);
+
+                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
                 }
             }
         }
 
-        return nodeIds;
+        return new LinkedHashSet<>(res.values());
     }
 
     /**
      * Get IGFS affinity nodes for split if possible.
-     *
-     * <p/>Order in the returned collection *is* significant, meaning that nodes containing
more data
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes containing more
data
      * go first. This way, the 1st nodes in the collection considered to be preferable for
scheduling.
      *
      * @param split Input split.
@@ -221,7 +223,6 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
                             }
 
                             // Sort the nodes in non-ascending order by contained data lengths.
-                            // NodeIdAndLength objects are used as keys to handle comparison
when the lengths are equal.
                             Map<NodeIdAndLength, UUID> res = new TreeMap<>();
 
                             for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet())
{


Mime
View raw message