Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 922D7200B8E for ; Mon, 26 Sep 2016 13:24:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 90E83160AE3; Mon, 26 Sep 2016 11:24:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60257160AF6 for ; Mon, 26 Sep 2016 13:24:44 +0200 (CEST) Received: (qmail 58644 invoked by uid 500); 26 Sep 2016 11:24:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 57647 invoked by uid 99); 26 Sep 2016 11:24:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2016 11:24:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6E077E97D7; Mon, 26 Sep 2016 11:24:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 26 Sep 2016 11:24:56 -0000 Message-Id: In-Reply-To: <3d89efb791c94746be750e406842d737@git.apache.org> References: <3d89efb791c94746be750e406842d737@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/47] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode. archived-at: Mon, 26 Sep 2016 11:24:45 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java new file mode 100644 index 0000000..430c675 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; +import org.apache.ignite.internal.processors.igfs.IgfsMock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Tests for weighted map-reduce planned. + */ +public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { + /** ID 1. */ + private static final UUID ID_1 = new UUID(0, 1); + + /** ID 2. */ + private static final UUID ID_2 = new UUID(0, 2); + + /** ID 3. */ + private static final UUID ID_3 = new UUID(0, 3); + + /** MAC 1. */ + private static final String MAC_1 = "mac1"; + + /** MAC 2. */ + private static final String MAC_2 = "mac2"; + + /** MAC 3. */ + private static final String MAC_3 = "mac3"; + + /** Host 1. */ + private static final String HOST_1 = "host1"; + + /** Host 2. */ + private static final String HOST_2 = "host2"; + + /** Host 3. */ + private static final String HOST_3 = "host3"; + + /** Host 4. */ + private static final String HOST_4 = "host4"; + + /** Host 5. */ + private static final String HOST_5 = "host5"; + + /** Standard node 1. */ + private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1); + + /** Standard node 2. */ + private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2); + + /** Standard node 3. */ + private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3); + + /** Standard nodes. */ + private static final Collection NODES; + + /** + * Static initializer. + */ + static { + NODES = new ArrayList<>(); + + NODES.add(NODE_1); + NODES.add(NODE_2); + NODES.add(NODE_3); + } + + /** + * Test one IGFS split being assigned to affinity node. + * + * @throws Exception If failed. + */ + public void testOneIgfsSplitAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + List splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50)); + + final int expReducers = 4; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + assert plan.mappers() == 1; + assert plan.mapperNodeIds().size() == 1; + assert plan.mapperNodeIds().contains(ID_1); + + checkPlanMappers(plan, splits, NODES, false/*only 1 split*/); + checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/); + } + + /** + * Test one HDFS splits. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 7; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Test HDFS splits with Replication == 3. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsReplication() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 8; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Get all IDs. + * + * @param nodes Nodes. + * @return IDs. + */ + private static Set allIds(Collection nodes) { + Set allIds = new HashSet<>(); + + for (ClusterNode n : nodes) + allIds.add(n.id()); + + return allIds; + } + + /** + * Check mappers for the plan. + * + * @param plan Plan. + * @param splits Splits. + * @param nodes Nodes. + * @param expectUniformity WHether uniformity is expected. + */ + private static void checkPlanMappers(HadoopMapReducePlan plan, List splits, + Collection nodes, boolean expectUniformity) { + // Number of mappers should correspomd to the number of input splits: + assertEquals(splits.size(), plan.mappers()); + + if (expectUniformity) { + // mappers are assigned to all available nodes: + assertEquals(nodes.size(), plan.mapperNodeIds().size()); + + + assertEquals(allIds(nodes), plan.mapperNodeIds()); + } + + // Check all splits are covered by mappers: + Set set = new HashSet<>(); + + for (UUID id: plan.mapperNodeIds()) { + Collection sp = plan.mappers(id); + + assert sp != null; + + for (HadoopInputSplit s: sp) + assertTrue(set.add(s)); + } + + // must be of the same size & contain same elements: + assertEquals(set, new HashSet<>(splits)); + } + + /** + * Check plan reducers. + * + * @param plan Plan. + * @param nodes Nodes. + * @param expReducers Expected reducers. + * @param expectUniformity Expected uniformity. + */ + private static void checkPlanReducers(HadoopMapReducePlan plan, + Collection nodes, int expReducers, boolean expectUniformity) { + + assertEquals(expReducers, plan.reducers()); + + if (expectUniformity) + assertEquals(allIds(nodes), plan.reducerNodeIds()); + + int sum = 0; + int lenSum = 0; + + for (UUID uuid: plan.reducerNodeIds()) { + int[] rr = plan.reducers(uuid); + + assert rr != null; + + lenSum += rr.length; + + for (int i: rr) + sum += i; + } + + assertEquals(expReducers, lenSum); + + // Numbers in the arrays must be consequtive integers stating from 0, + // check that simply calculating their total sum: + assertEquals((lenSum * (lenSum - 1) / 2), sum); + } + + /** + * Create planner for IGFS. + * + * @param igfs IGFS. + * @return Planner. + */ + private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) { + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs); + + GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite); + + return planner; + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } + + /** + * Mocked node. + */ + private static class MockNode implements ClusterNode { + /** ID. */ + private final UUID id; + + /** MAC addresses. */ + private final String macs; + + /** Addresses. */ + private final List addrs; + + /** + * Constructor. + * + * @param id Node ID. + * @param macs MAC addresses. + * @param addrs Addresses. + */ + public MockNode(UUID id, String macs, String... addrs) { + assert addrs != null; + + this.id = id; + this.macs = macs; + + this.addrs = Arrays.asList(addrs); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public T attribute(String name) { + if (F.eq(name, IgniteNodeAttributes.ATTR_MACS)) + return (T)macs; + + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Map attributes() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection hostNames() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public long order() { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + throwUnsupported(); + + return false; + } + } + + /** + * Locations builder. + */ + private static class LocationsBuilder { + /** Locations. */ + private final TreeMap> locs = new TreeMap<>(); + + /** + * Create new locations builder. + * + * @return Locations builder. + */ + public static LocationsBuilder create() { + return new LocationsBuilder(); + } + + /** + * Add locations. + * + * @param start Start. + * @param nodes Nodes. + * @return This builder for chaining. + */ + public LocationsBuilder add(long start, MockNode... nodes) { + locs.put(start, Arrays.asList(nodes)); + + return this; + } + + /** + * Build locations. + * + * @return Locations. + */ + public TreeMap> build() { + return locs; + } + + /** + * Build IGFS. + * + * @return IGFS. + */ + public MockIgfs buildIgfs() { + return new MockIgfs(build()); + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs extends IgfsMock { + /** Block locations. */ + private final TreeMap> locs; + + /** + * Constructor. + * + * @param locs Block locations. + */ + public MockIgfs(TreeMap> locs) { + super("igfs"); + + this.locs = locs; + } + + /** {@inheritDoc} */ + @Override public Collection affinity(IgfsPath path, long start, long len) { + Collection res = new ArrayList<>(); + + long cur = start; + long remaining = len; + + long prevLocStart = -1; + Collection prevLocNodes = null; + + for (Map.Entry> locEntry : locs.entrySet()) { + long locStart = locEntry.getKey(); + Collection locNodes = locEntry.getValue(); + + if (prevLocNodes != null) { + if (cur < locStart) { + // Add part from previous block. + long prevLen = locStart - prevLocStart; + + res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes)); + + cur = locStart; + remaining -= prevLen; + } + } + + prevLocStart = locStart; + prevLocNodes = locNodes; + + if (remaining == 0) + break; + } + + // Add remainder. + if (remaining != 0) + res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes)); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return false; + } + } + + /** + * Mocked block location. + */ + private static class IgfsBlockLocationMock implements IgfsBlockLocation { + /** Start. */ + private final long start; + + /** Length. */ + private final long len; + + /** Node IDs. */ + private final List nodeIds; + + /** + * Constructor. + * + * @param start Start. + * @param len Length. + * @param nodes Nodes. + */ + public IgfsBlockLocationMock(long start, long len, Collection nodes) { + this.start = start; + this.len = len; + + this.nodeIds = new ArrayList<>(nodes.size()); + + for (MockNode node : nodes) + nodeIds.add(node.id); + } + + /** {@inheritDoc} */ + @Override public long start() { + return start; + } + + /** {@inheritDoc} */ + @Override public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public Collection nodeIds() { + return nodeIds; + } + + /** {@inheritDoc} */ + @Override public Collection names() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection hosts() { + throwUnsupported(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java new file mode 100644 index 0000000..13f00bd --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; + +/** + * Tests whole map-red execution Weighted planner. + */ +public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected HadoopConfiguration createHadoopConfiguration() { + HadoopConfiguration hadoopCfg = new HadoopConfiguration(); + + // Use weighted planner with default settings: + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + hadoopCfg.setMapReducePlanner(planner); + + return hadoopCfg; + } +}