Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9AAF6200C0E for ; Tue, 17 Jan 2017 15:04:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9953F160B6E; Tue, 17 Jan 2017 14:04:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C33A3160B46 for ; Tue, 17 Jan 2017 15:04:38 +0100 (CET) Received: (qmail 93391 invoked by uid 500); 17 Jan 2017 14:04:37 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 93374 invoked by uid 99); 17 Jan 2017 14:04:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jan 2017 14:04:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CDB79DFADC; Tue, 17 Jan 2017 14:04:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 17 Jan 2017 14:04:37 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/50] [abbrv] ignite git commit: IGNITE-4459: Hadoop: weighted planned is default one from now on. This closes #1391. archived-at: Tue, 17 Jan 2017 14:04:40 -0000 Repository: ignite Updated Branches: refs/heads/ignite-gg-11810-1 fcfd62ace -> 8f005c308 IGNITE-4459: Hadoop: weighted planned is default one from now on. This closes #1391. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9b1fc2b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9b1fc2b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9b1fc2b Branch: refs/heads/ignite-gg-11810-1 Commit: a9b1fc2b3840d47d7c978d9296e8ae6bdeb10be5 Parents: 2e691d8 Author: tledkov-gridgain Authored: Thu Dec 29 11:07:22 2016 +0300 Committer: devozerov Committed: Thu Dec 29 11:07:22 2016 +0300 ---------------------------------------------------------------------- .../mapreduce/IgniteHadoopMapReducePlanner.java | 414 ------------- .../processors/hadoop/HadoopProcessor.java | 4 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 619 ------------------- .../testsuites/IgniteHadoopTestSuite.java | 2 - 4 files changed, 2 insertions(+), 1037 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java deleted file mode 100644 index ac42381..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.mapreduce; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.UUID; - -import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; - -/** - * Default map-reduce planner implementation. - */ -public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner { - /** {@inheritDoc} */ - @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection top, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - // Convert collection of topology nodes to collection of topology node IDs. - Collection topIds = new HashSet<>(top.size(), 1.0f); - - for (ClusterNode topNode : top) - topIds.add(topNode.id()); - - Map> mappers = mappers(top, topIds, job.input()); - - int rdcCnt = job.info().reducers(); - - if (rdcCnt < 0) - throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); - - Map reducers = reducers(top, mappers, rdcCnt); - - return new HadoopDefaultMapReducePlan(mappers, reducers); - } - - /** - * Create plan for mappers. - * - * @param top Topology nodes. - * @param topIds Topology node IDs. - * @param splits Splits. - * @return Mappers map. - * @throws IgniteCheckedException If failed. - */ - private Map> mappers(Collection top, Collection topIds, - Iterable splits) throws IgniteCheckedException { - Map> mappers = new HashMap<>(); - - Map> nodes = groupByHost(top); - - Map nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. - - for (UUID nodeId : topIds) - nodeLoads.put(nodeId, 0); - - for (HadoopInputSplit split : splits) { - UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); - - if (log.isDebugEnabled()) - log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); - - Collection nodeSplits = mappers.get(nodeId); - - if (nodeSplits == null) { - nodeSplits = new ArrayList<>(); - - mappers.put(nodeId, nodeSplits); - } - - nodeSplits.add(split); - - // Updated node load. - nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); - } - - return mappers; - } - - /** - * Determine the best node for this split. - * - * @param split Split. - * @param topIds Topology node IDs. - * @param nodes Nodes. - * @param nodeLoads Node load tracker. - * @return Node ID. - * @throws IgniteCheckedException On error. - */ - @SuppressWarnings("unchecked") - private UUID nodeForSplit(HadoopInputSplit split, Collection topIds, Map> nodes, - Map nodeLoads) throws IgniteCheckedException { - if (split instanceof HadoopFileBlock) { - HadoopFileBlock split0 = (HadoopFileBlock)split; - - if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); - - IgfsEx igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); - - if (igfs != null && !igfs.isProxy(split0.file())) { - IgfsPath path = new IgfsPath(split0.file()); - - if (igfs.exists(path)) { - Collection blocks; - - try { - blocks = igfs.affinity(path, split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - - assert blocks != null; - - if (blocks.size() == 1) - // Fast-path, split consists of one IGFS block (as in most cases). - return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); - else { - // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. - Map nodeMap = new HashMap<>(); - - List bestNodeIds = null; - long bestLen = -1L; - - for (IgfsBlockLocation block : blocks) { - for (UUID blockNodeId : block.nodeIds()) { - if (topIds.contains(blockNodeId)) { - Long oldLen = nodeMap.get(blockNodeId); - long newLen = oldLen == null ? block.length() : oldLen + block.length(); - - nodeMap.put(blockNodeId, newLen); - - if (bestNodeIds == null || bestLen < newLen) { - bestNodeIds = new ArrayList<>(1); - - bestNodeIds.add(blockNodeId); - - bestLen = newLen; - } - else if (bestLen == newLen) { - assert !F.isEmpty(bestNodeIds); - - bestNodeIds.add(blockNodeId); - } - } - } - } - - if (bestNodeIds != null) { - return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : - bestNode(bestNodeIds, topIds, nodeLoads, true); - } - } - } - } - } - } - - // Cannot use local IGFS for some reason, try selecting the node by host. - Collection blockNodes = null; - - for (String host : split.hosts()) { - Collection hostNodes = nodes.get(host); - - if (!F.isEmpty(hostNodes)) { - if (blockNodes == null) - blockNodes = new ArrayList<>(hostNodes); - else - blockNodes.addAll(hostNodes); - } - } - - return bestNode(blockNodes, topIds, nodeLoads, false); - } - - /** - * Finds the best (the least loaded) node among the candidates. - * - * @param candidates Candidates. - * @param topIds Topology node IDs. - * @param nodeLoads Known node loads. - * @param skipTopCheck Whether to skip topology check. - * @return The best node. - */ - private UUID bestNode(@Nullable Collection candidates, Collection topIds, Map nodeLoads, - boolean skipTopCheck) { - UUID bestNode = null; - int bestLoad = Integer.MAX_VALUE; - - if (candidates != null) { - for (UUID candidate : candidates) { - if (skipTopCheck || topIds.contains(candidate)) { - int load = nodeLoads.get(candidate); - - if (bestNode == null || bestLoad > load) { - bestNode = candidate; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - } - - if (bestNode == null) { - // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. - bestLoad = Integer.MAX_VALUE; - - for (UUID nodeId : topIds) { - int load = nodeLoads.get(nodeId); - - if (bestNode == null || bestLoad > load) { - bestNode = nodeId; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - - assert bestNode != null; - - return bestNode; - } - - /** - * Create plan for reducers. - * - * @param top Topology. - * @param mappers Mappers map. - * @param reducerCnt Reducers count. - * @return Reducers map. - */ - private Map reducers(Collection top, - Map> mappers, int reducerCnt) { - // Determine initial node weights. - int totalWeight = 0; - - List nodes = new ArrayList<>(top.size()); - - for (ClusterNode node : top) { - Collection split = mappers.get(node.id()); - - int weight = reducerNodeWeight(node, split != null ? split.size() : 0); - - nodes.add(new WeightedNode(node.id(), weight, weight)); - - totalWeight += weight; - } - - // Adjust weights. - int totalAdjustedWeight = 0; - - for (WeightedNode node : nodes) { - node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; - - node.weight = Math.round(node.floatWeight); - - totalAdjustedWeight += node.weight; - } - - // Apply redundant/lost reducers. - Collections.sort(nodes); - - if (totalAdjustedWeight > reducerCnt) { - // Too much reducers set. - ListIterator iter = nodes.listIterator(nodes.size() - 1); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasPrevious()) - iter = nodes.listIterator(nodes.size() - 1); - - WeightedNode node = iter.previous(); - - if (node.weight > 0) { - node.weight -= 1; - - totalAdjustedWeight--; - } - } - } - else if (totalAdjustedWeight < reducerCnt) { - // Not enough reducers set. - ListIterator iter = nodes.listIterator(0); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasNext()) - iter = nodes.listIterator(0); - - WeightedNode node = iter.next(); - - if (node.floatWeight > 0.0f) { - node.weight += 1; - - totalAdjustedWeight++; - } - } - } - - int idx = 0; - - Map reducers = new HashMap<>(nodes.size(), 1.0f); - - for (WeightedNode node : nodes) { - if (node.weight > 0) { - int[] arr = new int[node.weight]; - - for (int i = 0; i < arr.length; i++) - arr[i] = idx++; - - reducers.put(node.nodeId, arr); - } - } - - return reducers; - } - - /** - * Calculate node weight based on node metrics and data co-location. - * - * @param node Node. - * @param splitCnt Splits mapped to this node. - * @return Node weight. - */ - @SuppressWarnings("UnusedParameters") - protected int reducerNodeWeight(ClusterNode node, int splitCnt) { - return splitCnt; - } - - /** - * Weighted node. - */ - private static class WeightedNode implements Comparable { - /** Node ID. */ - private final UUID nodeId; - - /** Weight. */ - private int weight; - - /** Floating point weight. */ - private float floatWeight; - - /** - * Constructor. - * - * @param nodeId Node ID. - * @param weight Weight. - * @param floatWeight Floating point weight. - */ - private WeightedNode(UUID nodeId, int weight, float floatWeight) { - this.nodeId = nodeId; - this.weight = weight; - this.floatWeight = floatWeight; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return nodeId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull WeightedNode other) { - float res = other.floatWeight - floatWeight; - - return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java index f0df1e9..329d67f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; @@ -243,7 +243,7 @@ public class HadoopProcessor extends HadoopProcessorAdapter { */ private void initializeDefaults(HadoopConfiguration cfg) { if (cfg.getMapReducePlanner() == null) - cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); + cfg.setMapReducePlanner(new IgniteHadoopWeightedMapReducePlanner()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java deleted file mode 100644 index ee1c88f..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java +++ /dev/null @@ -1,619 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; -import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; -import org.apache.ignite.internal.processors.igfs.IgfsMock; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.testframework.GridTestNode; -import org.apache.ignite.testframework.GridTestUtils; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -/** - * - */ -public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest { - /** */ - private static final UUID ID_1 = new UUID(0, 1); - - /** */ - private static final UUID ID_2 = new UUID(0, 2); - - /** */ - private static final UUID ID_3 = new UUID(0, 3); - - /** */ - private static final String HOST_1 = "host1"; - - /** */ - private static final String HOST_2 = "host2"; - - /** */ - private static final String HOST_3 = "host3"; - - /** */ - private static final String INVALID_HOST_1 = "invalid_host1"; - - /** */ - private static final String INVALID_HOST_2 = "invalid_host2"; - - /** */ - private static final String INVALID_HOST_3 = "invalid_host3"; - - /** Mocked IGFS. */ - private static final IgniteFileSystem IGFS = new MockIgfs(); - - /** Mocked Grid. */ - private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS); - - /** Planner. */ - private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); - - /** Block locations. */ - private static final Map> BLOCK_MAP = new HashMap<>(); - - /** Proxy map. */ - private static final Map PROXY_MAP = new HashMap<>(); - - /** Last created plan. */ - private static final ThreadLocal PLAN = new ThreadLocal<>(); - - /** - * Static initializer. - */ - static { - GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log()); - - BLOCK_MAP.clear(); - PROXY_MAP.clear(); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsOneBlockPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); - HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split2); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_1); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOrphans() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); - - plan(1, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * Create plan. - * - * @param reducers Reducers count. - * @param splits Splits. - * @return Plan. - * @throws IgniteCheckedException If failed. - */ - private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { - assert reducers > 0; - assert splits != null && splits.length > 0; - - Collection splitList = new ArrayList<>(splits.length); - - Collections.addAll(splitList, splits); - - Collection top = new ArrayList<>(); - - GridTestNode node1 = new GridTestNode(ID_1); - GridTestNode node2 = new GridTestNode(ID_2); - GridTestNode node3 = new GridTestNode(ID_3); - - node1.setHostName(HOST_1); - node2.setHostName(HOST_2); - node3.setHostName(HOST_3); - - top.add(node1); - top.add(node2); - top.add(node3); - - HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null); - - PLAN.set(plan); - - return plan; - } - - /** - * Ensure that node contains the given mappers. - * - * @param nodeId Node ID. - * @param expSplits Expected splits. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) { - Collection expSplitsCol = new ArrayList<>(); - - Collections.addAll(expSplitsCol, expSplits); - - Collection splits = PLAN.get().mappers(nodeId); - - return F.eq(expSplitsCol, splits); - } - - /** - * Ensure that node contains the given amount of reducers. - * - * @param nodeId Node ID. - * @param reducers Reducers. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureReducers(UUID nodeId, int reducers) { - int[] reducersArr = PLAN.get().reducers(nodeId); - - return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); - } - - /** - * Ensure that no mappers and reducers is located on this node. - * - * @param nodeId Node ID. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureEmpty(UUID nodeId) { - return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); - } - - /** - * Create split. - * - * @param igfs IGFS flag. - * @param file File. - * @param start Start. - * @param len Length. - * @param hosts Hosts. - * @return Split. - */ - private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { - URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); - - return new HadoopFileBlock(hosts, uri, start, len); - } - - /** - * Create block location. - * - * @param start Start. - * @param len Length. - * @param nodeIds Node IDs. - * @return Block location. - */ - private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { - assert nodeIds != null && nodeIds.length > 0; - - Collection nodes = new ArrayList<>(nodeIds.length); - - for (UUID id : nodeIds) - nodes.add(new GridTestNode(id)); - - return new IgfsBlockLocationImpl(start, len, nodes); - } - - /** - * Map IGFS block to nodes. - * - * @param file File. - * @param start Start. - * @param len Length. - * @param locations Locations. - */ - private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { - assert locations != null && locations.length > 0; - - IgfsPath path = new IgfsPath(file); - - Block block = new Block(path, start, len); - - Collection locationsList = new ArrayList<>(); - - Collections.addAll(locationsList, locations); - - BLOCK_MAP.put(block, locationsList); - } - - /** - * Block. - */ - private static class Block { - /** */ - private final IgfsPath path; - - /** */ - private final long start; - - /** */ - private final long len; - - /** - * Constructor. - * - * @param path Path. - * @param start Start. - * @param len Length. - */ - private Block(IgfsPath path, long start, long len) { - this.path = path; - this.start = start; - this.len = len; - } - - /** {@inheritDoc} */ - @SuppressWarnings("RedundantIfStatement") - @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Block)) return false; - - Block block = (Block) o; - - if (len != block.len) - return false; - - if (start != block.start) - return false; - - if (!path.equals(block.path)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = path.hashCode(); - - res = 31 * res + (int) (start ^ (start >>> 32)); - res = 31 * res + (int) (len ^ (len >>> 32)); - - return res; - } - } - - /** - * Mocked IGFS. - */ - private static class MockIgfs extends IgfsMock { - /** - * Constructor. - */ - public MockIgfs() { - super("igfs"); - } - - /** {@inheritDoc} */ - @Override public boolean isProxy(URI path) { - return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); - } - - /** {@inheritDoc} */ - @Override public Collection affinity(IgfsPath path, long start, long len) { - return BLOCK_MAP.get(new Block(path, start, len)); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - return true; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 01893fb..6c2d5c4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSy import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest; import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopCommandLineTest; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultMapReducePlannerSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopFileSystemsTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopGroupingTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopJobTrackerSelfTest; @@ -116,7 +115,6 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopUserLibsSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedMapReducePlannerTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(BasicUserNameMapperSelfTest.class.getName())));