Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A24FC200B88 for ; Thu, 22 Sep 2016 09:00:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A1060160AE3; Thu, 22 Sep 2016 07:00:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B57B7160AFA for ; Thu, 22 Sep 2016 09:00:44 +0200 (CEST) Received: (qmail 18004 invoked by uid 500); 22 Sep 2016 07:00:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 16954 invoked by uid 99); 22 Sep 2016 07:00:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Sep 2016 07:00:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A90A2E04BA; Thu, 22 Sep 2016 07:00:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 22 Sep 2016 07:01:07 -0000 Message-Id: In-Reply-To: <0e93c15c13614e1ea0d914616ab0d0bf@git.apache.org> References: <0e93c15c13614e1ea0d914616ab0d0bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/51] [partial] ignite git commit: IGNITE-3949: Applied new HadoopClassLoader architecture. archived-at: Thu, 22 Sep 2016 07:00:48 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java deleted file mode 100644 index a69b72a..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; -import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; -import org.apache.ignite.internal.processors.igfs.IgfsMock; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.testframework.GridTestNode; -import org.apache.ignite.testframework.GridTestUtils; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -/** - * - */ -public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest { - /** */ - private static final UUID ID_1 = new UUID(0, 1); - - /** */ - private static final UUID ID_2 = new UUID(0, 2); - - /** */ - private static final UUID ID_3 = new UUID(0, 3); - - /** */ - private static final String HOST_1 = "host1"; - - /** */ - private static final String HOST_2 = "host2"; - - /** */ - private static final String HOST_3 = "host3"; - - /** */ - private static final String INVALID_HOST_1 = "invalid_host1"; - - /** */ - private static final String INVALID_HOST_2 = "invalid_host2"; - - /** */ - private static final String INVALID_HOST_3 = "invalid_host3"; - - /** Mocked IGFS. */ - private static final IgniteFileSystem IGFS = new MockIgfs(); - - /** Mocked Grid. */ - private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS); - - /** Planner. */ - private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); - - /** Block locations. */ - private static final Map> BLOCK_MAP = new HashMap<>(); - - /** Proxy map. */ - private static final Map PROXY_MAP = new HashMap<>(); - - /** Last created plan. */ - private static final ThreadLocal PLAN = new ThreadLocal<>(); - - /** - * Static initializer. - */ - static { - GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log()); - - BLOCK_MAP.clear(); - PROXY_MAP.clear(); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsOneBlockPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); - HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { - HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); - HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split2); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_1); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOrphans() throws IgniteCheckedException { - HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); - HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); - HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); - - plan(1, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * Create plan. - * - * @param reducers Reducers count. - * @param splits Splits. - * @return Plan. - * @throws IgniteCheckedException If failed. - */ - private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { - assert reducers > 0; - assert splits != null && splits.length > 0; - - Collection splitList = new ArrayList<>(splits.length); - - Collections.addAll(splitList, splits); - - Collection top = new ArrayList<>(); - - GridTestNode node1 = new GridTestNode(ID_1); - GridTestNode node2 = new GridTestNode(ID_2); - GridTestNode node3 = new GridTestNode(ID_3); - - node1.setHostName(HOST_1); - node2.setHostName(HOST_2); - node3.setHostName(HOST_3); - - top.add(node1); - top.add(node2); - top.add(node3); - - HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null); - - PLAN.set(plan); - - return plan; - } - - /** - * Ensure that node contains the given mappers. - * - * @param nodeId Node ID. - * @param expSplits Expected splits. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) { - Collection expSplitsCol = new ArrayList<>(); - - Collections.addAll(expSplitsCol, expSplits); - - Collection splits = PLAN.get().mappers(nodeId); - - return F.eq(expSplitsCol, splits); - } - - /** - * Ensure that node contains the given amount of reducers. - * - * @param nodeId Node ID. - * @param reducers Reducers. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureReducers(UUID nodeId, int reducers) { - int[] reducersArr = PLAN.get().reducers(nodeId); - - return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); - } - - /** - * Ensure that no mappers and reducers is located on this node. - * - * @param nodeId Node ID. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureEmpty(UUID nodeId) { - return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); - } - - /** - * Create split. - * - * @param igfs IGFS flag. - * @param file File. - * @param start Start. - * @param len Length. - * @param hosts Hosts. - * @return Split. - */ - private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { - URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); - - return new HadoopFileBlock(hosts, uri, start, len); - } - - /** - * Create block location. - * - * @param start Start. - * @param len Length. - * @param nodeIds Node IDs. - * @return Block location. - */ - private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { - assert nodeIds != null && nodeIds.length > 0; - - Collection nodes = new ArrayList<>(nodeIds.length); - - for (UUID id : nodeIds) - nodes.add(new GridTestNode(id)); - - return new IgfsBlockLocationImpl(start, len, nodes); - } - - /** - * Map IGFS block to nodes. - * - * @param file File. - * @param start Start. - * @param len Length. - * @param locations Locations. - */ - private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { - assert locations != null && locations.length > 0; - - IgfsPath path = new IgfsPath(file); - - Block block = new Block(path, start, len); - - Collection locationsList = new ArrayList<>(); - - Collections.addAll(locationsList, locations); - - BLOCK_MAP.put(block, locationsList); - } - - /** - * Block. - */ - private static class Block { - /** */ - private final IgfsPath path; - - /** */ - private final long start; - - /** */ - private final long len; - - /** - * Constructor. - * - * @param path Path. - * @param start Start. - * @param len Length. - */ - private Block(IgfsPath path, long start, long len) { - this.path = path; - this.start = start; - this.len = len; - } - - /** {@inheritDoc} */ - @SuppressWarnings("RedundantIfStatement") - @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Block)) return false; - - Block block = (Block) o; - - if (len != block.len) - return false; - - if (start != block.start) - return false; - - if (!path.equals(block.path)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = path.hashCode(); - - res = 31 * res + (int) (start ^ (start >>> 32)); - res = 31 * res + (int) (len ^ (len >>> 32)); - - return res; - } - } - - /** - * Mocked IGFS. - */ - private static class MockIgfs extends IgfsMock { - /** - * Constructor. - */ - public MockIgfs() { - super("igfs"); - } - - /** {@inheritDoc} */ - @Override public boolean isProxy(URI path) { - return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); - } - - /** {@inheritDoc} */ - @Override public Collection affinity(IgfsPath path, long start, long len) { - return BLOCK_MAP.get(new Block(path, start, len)); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - return true; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java deleted file mode 100644 index 843b42b..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Error simulator. - */ -public class HadoopErrorSimulator { - /** No-op singleton instance. */ - public static final HadoopErrorSimulator noopInstance = new HadoopErrorSimulator(); - - /** Instance ref. */ - private static final AtomicReference ref = new AtomicReference<>(noopInstance); - - /** - * Creates simulator of given kind with given stage bits. - * - * @param kind The kind. - * @param bits The stage bits. - * @return The simulator. - */ - public static HadoopErrorSimulator create(Kind kind, int bits) { - switch (kind) { - case Noop: - return noopInstance; - case Runtime: - return new RuntimeExceptionBitHadoopErrorSimulator(bits); - case IOException: - return new IOExceptionBitHadoopErrorSimulator(bits); - case Error: - return new ErrorBitHadoopErrorSimulator(bits); - default: - throw new IllegalStateException("Unknown kind: " + kind); - } - } - - /** - * Gets the error simulator instance. - */ - public static HadoopErrorSimulator instance() { - return ref.get(); - } - - /** - * Sets instance. - */ - public static boolean setInstance(HadoopErrorSimulator expect, HadoopErrorSimulator update) { - return ref.compareAndSet(expect, update); - } - - /** - * Constructor. - */ - private HadoopErrorSimulator() { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onMapConfigure() { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onMapSetup() throws IOException, InterruptedException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onMap() throws IOException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onMapCleanup() throws IOException, InterruptedException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onMapClose() throws IOException { - // no-op - } - - /** - * setConf() does not declare IOException to be thrown. - */ - public void onCombineConfigure() { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onCombineSetup() throws IOException, InterruptedException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onCombine() throws IOException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onCombineCleanup() throws IOException, InterruptedException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onReduceConfigure() { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onReduceSetup() throws IOException, InterruptedException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onReduce() throws IOException { - // no-op - } - - /** - * Invoked on the named stage. - */ - public void onReduceCleanup() throws IOException, InterruptedException { - // no-op - } - - /** - * Error kind. - */ - public enum Kind { - /** No error. */ - Noop, - - /** Runtime. */ - Runtime, - - /** IOException. */ - IOException, - - /** java.lang.Error. */ - Error - } - - /** - * Runtime error simulator. - */ - public static class RuntimeExceptionBitHadoopErrorSimulator extends HadoopErrorSimulator { - /** Stage bits: defines what map-reduce stages will cause errors. */ - private final int bits; - - /** - * Constructor. - */ - protected RuntimeExceptionBitHadoopErrorSimulator(int b) { - bits = b; - } - - /** - * Simulates an error. - */ - protected void simulateError() throws IOException { - throw new RuntimeException("An error simulated by " + getClass().getSimpleName()); - } - - /** {@inheritDoc} */ - @Override public final void onMapConfigure() { - try { - if ((bits & 1) != 0) - simulateError(); - } - catch (IOException e) { - // ignore - } - } - - /** {@inheritDoc} */ - @Override public final void onMapSetup() throws IOException, InterruptedException { - if ((bits & 2) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onMap() throws IOException { - if ((bits & 4) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onMapCleanup() throws IOException, InterruptedException { - if ((bits & 8) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onCombineConfigure() { - try { - if ((bits & 16) != 0) - simulateError(); - } - catch (IOException e) { - // ignore - } - } - - /** {@inheritDoc} */ - @Override public final void onCombineSetup() throws IOException, InterruptedException { - if ((bits & 32) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onCombine() throws IOException { - if ((bits & 64) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onCombineCleanup() throws IOException, InterruptedException { - if ((bits & 128) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onReduceConfigure() { - try { - if ((bits & 256) != 0) - simulateError(); - } - catch (IOException e) { - // ignore - } - } - - /** {@inheritDoc} */ - @Override public final void onReduceSetup() throws IOException, InterruptedException { - if ((bits & 512) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onReduce() throws IOException { - if ((bits & 1024) != 0) - simulateError(); - } - - /** {@inheritDoc} */ - @Override public final void onReduceCleanup() throws IOException, InterruptedException { - if ((bits & 2048) != 0) - simulateError(); - } - } - - /** - * java.lang.Error simulator. - */ - public static class ErrorBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { - /** - * Constructor. - */ - public ErrorBitHadoopErrorSimulator(int bits) { - super(bits); - } - - /** {@inheritDoc} */ - @Override protected void simulateError() { - throw new Error("An error simulated by " + getClass().getSimpleName()); - } - } - - /** - * IOException simulator. - */ - public static class IOExceptionBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { - /** - * Constructor. - */ - public IOExceptionBitHadoopErrorSimulator(int bits) { - super(bits); - } - - /** {@inheritDoc} */ - @Override protected void simulateError() throws IOException { - throw new IOException("An IOException simulated by " + getClass().getSimpleName()); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java deleted file mode 100644 index 946ba77..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.testframework.GridTestUtils; - -/** - * Test file systems for the working directory multi-threading support. - */ -public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { - /** the number of threads */ - private static final int THREAD_COUNT = 3; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - - /** - * Test the file system with specified URI for the multi-thread working directory support. - * - * @param uri Base URI of the file system (scheme and authority). - * @throws Exception If fails. - */ - private void testFileSystem(final URI uri) throws Exception { - final Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, - new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); - - final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); - - final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; - final Path[] newWorkDir = new Path[THREAD_COUNT]; - final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; - final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; - - final AtomicInteger threadNum = new AtomicInteger(0); - - GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - int curThreadNum = threadNum.getAndIncrement(); - - if ("file".equals(uri.getScheme())) - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); - - changeUserPhase.countDown(); - changeUserPhase.await(); - - newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); - - changeDirPhase.countDown(); - changeDirPhase.await(); - - newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); - - changeAbsDirPhase.countDown(); - changeAbsDirPhase.await(); - - newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); - - finishPhase.countDown(); - } - catch (InterruptedException | IOException e) { - error("Failed to execute test thread.", e); - - fail(); - } - } - }, THREAD_COUNT, "filesystems-test"); - - finishPhase.await(); - - for (int i = 0; i < THREAD_COUNT; i ++) { - cfg.set(MRJobConfig.USER_NAME, "user" + i); - - Path workDir = new Path(new Path(uri), "user/user" + i); - - cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); - - assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); - - assertEquals(workDir, newUserInitWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); - - assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); - } - - System.out.println(System.getProperty("user.dir")); - } - - /** - * Test LocalFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testLocal() throws Exception { - testFileSystem(URI.create("file:///")); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java deleted file mode 100644 index db87e33..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.GridRandom; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.S; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Grouping test. - */ -public class HadoopGroupingTest extends HadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** */ - private static final GridConcurrentHashSet vals = HadoopSharedMap.map(HadoopGroupingTest.class) - .put("vals", new GridConcurrentHashSet()); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - protected boolean igfsEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testGroupingReducer() throws Exception { - doTestGrouping(false); - } - - /** - * @throws Exception If failed. - */ - public void testGroupingCombiner() throws Exception { - doTestGrouping(true); - } - - /** - * @param combiner With combiner. - * @throws Exception If failed. - */ - public void doTestGrouping(boolean combiner) throws Exception { - vals.clear(); - - Job job = Job.getInstance(); - - job.setInputFormatClass(InFormat.class); - job.setOutputFormatClass(OutFormat.class); - - job.setOutputKeyClass(YearTemperature.class); - job.setOutputValueClass(Text.class); - - job.setMapperClass(Mapper.class); - - if (combiner) { - job.setCombinerClass(MyReducer.class); - job.setNumReduceTasks(0); - job.setCombinerKeyGroupingComparatorClass(YearComparator.class); - } - else { - job.setReducerClass(MyReducer.class); - job.setNumReduceTasks(4); - job.setGroupingComparatorClass(YearComparator.class); - } - - grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(30000); - - assertTrue(vals.isEmpty()); - } - - public static class MyReducer extends Reducer { - /** */ - int lastYear; - - @Override protected void reduce(YearTemperature key, Iterable vals0, Context context) - throws IOException, InterruptedException { - X.println("___ : " + context.getTaskAttemptID() + " --> " + key); - - Set ids = new HashSet<>(); - - for (Text val : vals0) - assertTrue(ids.add(UUID.fromString(val.toString()))); - - for (Text val : vals0) - assertTrue(ids.remove(UUID.fromString(val.toString()))); - - assertTrue(ids.isEmpty()); - - assertTrue(key.year > lastYear); - - lastYear = key.year; - - for (Text val : vals0) - assertTrue(vals.remove(UUID.fromString(val.toString()))); - } - } - - public static class YearComparator implements RawComparator { // Grouping comparator. - /** {@inheritDoc} */ - @Override public int compare(YearTemperature o1, YearTemperature o2) { - return Integer.compare(o1.year, o2.year); - } - - /** {@inheritDoc} */ - @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - throw new IllegalStateException(); - } - } - - public static class YearTemperature implements WritableComparable, Cloneable { - /** */ - private int year; - - /** */ - private int temperature; - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - out.writeInt(year); - out.writeInt(temperature); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - year = in.readInt(); - temperature = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { // To be partitioned by year. - return year; - } - - /** {@inheritDoc} */ - @Override public int compareTo(YearTemperature o) { - int res = Integer.compare(year, o.year); - - if (res != 0) - return res; - - // Sort comparator by year and temperature, to find max for year. - return Integer.compare(o.temperature, temperature); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(YearTemperature.class, this); - } - } - - public static class InFormat extends InputFormat { - /** {@inheritDoc} */ - @Override public List getSplits(JobContext context) throws IOException, InterruptedException { - ArrayList list = new ArrayList<>(); - - for (int i = 0; i < 10; i++) - list.add(new HadoopSortingTest.FakeSplit(20)); - - return list; - } - - /** {@inheritDoc} */ - @Override public RecordReader createRecordReader(final InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new RecordReader() { - /** */ - int cnt; - - /** */ - Random rnd = new GridRandom(); - - /** */ - YearTemperature key = new YearTemperature(); - - /** */ - Text val = new Text(); - - @Override public void initialize(InputSplit split, TaskAttemptContext context) { - // No-op. - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return cnt++ < split.getLength(); - } - - @Override public YearTemperature getCurrentKey() { - key.year = 1990 + rnd.nextInt(10); - key.temperature = 10 + rnd.nextInt(20); - - return key; - } - - @Override public Text getCurrentValue() { - UUID id = UUID.randomUUID(); - - assertTrue(vals.add(id)); - - val.set(id.toString()); - - return val; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - // No-op. - } - }; - } - } - - /** - * - */ - public static class OutFormat extends OutputFormat { - /** {@inheritDoc} */ - @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java deleted file mode 100644 index 9e268b7..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Job tracker self test. - */ -public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** Test block count parameter name. */ - private static final int BLOCK_CNT = 10; - - /** */ - private static HadoopSharedMap m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class); - - /** Map task execution count. */ - private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); - - /** */ - private static final Map latch = m.put("latch", new HashMap()); - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - latch.put("mapAwaitLatch", new CountDownLatch(1)); - latch.put("reduceAwaitLatch", new CountDownLatch(1)); - latch.put("combineAwaitLatch", new CountDownLatch(1)); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - mapExecCnt.set(0); - combineExecCnt.set(0); - reduceExecCnt.set(0); - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner()); - - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); - - HadoopJobId jobId = new HadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(0, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTaskWithCombinerPerMap() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setCombinerClass(TestCombiner.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); - - HadoopJobId jobId = new HadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - // All maps are completed. We have a combiner, so no reducers should be executed - // before combiner latch is released. - - U.sleep(50); - - assertEquals(0, reduceExecCnt.get()); - - info("Releasing combiner latch."); - - latch.get("combineAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(10, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * Checks job execution status. - * - * @param jobId Job ID. - * @param complete Completion status. - * @throws Exception If failed. - */ - private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception { - for (int i = 0; i < gridCount(); i++) { - IgniteKernal kernal = (IgniteKernal)grid(i); - - Hadoop hadoop = kernal.hadoop(); - - HadoopJobStatus stat = hadoop.status(jobId); - - assert stat != null; - - IgniteInternalFuture fut = hadoop.finishFuture(jobId); - - if (!complete) - assertFalse(fut.isDone()); - else { - info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + - kernal.getLocalNodeId() + ']'); - - fut.get(); - } - } - } - - /** - * Test input format - */ - public static class InFormat extends InputFormat { - - @Override public List getSplits(JobContext ctx) throws IOException, InterruptedException { - List res = new ArrayList<>(BLOCK_CNT); - - for (int i = 0; i < BLOCK_CNT; i++) - try { - res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); - } - catch (URISyntaxException e) { - throw new IOException(e); - } - - return res; - } - - @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { - return new RecordReader() { - @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { - } - - @Override public boolean nextKeyValue() { - return false; - } - - @Override public Object getCurrentKey() { - return null; - } - - @Override public Object getCurrentValue() { - return null; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - - } - }; - } - } - - /** - * Test mapper. - */ - private static class TestMapper extends Mapper { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("mapAwaitLatch").await(); - - mapExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test reducer. - */ - private static class TestReducer extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("reduceAwaitLatch").await(); - - reduceExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test combiner. - */ - private static class TestCombiner extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("combineAwaitLatch").await(); - - combineExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java deleted file mode 100644 index 25ef382..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Tests map-reduce execution with embedded mode. - */ -public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { - /** */ - private static Map flags = HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) - .put("flags", new HashMap()); - - /** {@inheritDoc} */ - @Override public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.setExternalExecution(false); - - return cfg; - } - - /** - * Tests whole job execution with all phases in old and new versions of API with definition of custom - * Serialization, Partitioner and IO formats. - * @throws Exception If fails. - */ - public void testMultiReducerWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, - "key6", 18000 ); - - for (int i = 0; i < 2; i++) { - boolean useNewAPI = i == 1; - - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - flags.put("serializationWasConfigured", false); - flags.put("partitionerWasConfigured", false); - flags.put("inputFormatWasConfigured", false); - flags.put("outputFormatWasConfigured", false); - - JobConf jobConf = new JobConf(); - - jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - //To split into about 6-7 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); - - if (!useNewAPI) { - jobConf.setPartitionerClass(CustomV1Partitioner.class); - jobConf.setInputFormat(CustomV1InputFormat.class); - jobConf.setOutputFormat(CustomV1OutputFormat.class); - } - - Job job = Job.getInstance(jobConf); - - HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false); - - if (useNewAPI) { - job.setPartitionerClass(CustomV2Partitioner.class); - job.setInputFormatClass(CustomV2InputFormat.class); - job.setOutputFormatClass(CustomV2OutputFormat.class); - } - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setNumReduceTasks(3); - - job.setJarByClass(HadoopWordCount2.class); - - IgniteInternalFuture fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - - assertTrue("Serialization was configured (new API is " + useNewAPI + ")", - flags.get("serializationWasConfigured")); - - assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", - flags.get("partitionerWasConfigured")); - - assertTrue("Input format was configured (new API is = " + useNewAPI + ")", - flags.get("inputFormatWasConfigured")); - - assertTrue("Output format was configured (new API is = " + useNewAPI + ")", - flags.get("outputFormatWasConfigured")); - - assertEquals("Use new API = " + useNewAPI, - "key3\t15000\n" + - "key6\t18000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") - ); - - assertEquals("Use new API = " + useNewAPI, - "key1\t10000\n" + - "key4\t7000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") - ); - - assertEquals("Use new API = " + useNewAPI, - "key2\t20000\n" + - "key5\t12000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") - ); - - } - } - - /** - * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. - */ - protected static class CustomSerialization extends WritableSerialization { - @Override public void setConf(Configuration conf) { - super.setConf(conf); - - flags.put("serializationWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v1 API. - */ - private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("partitionerWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v2 API. - */ - private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner - implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("partitionerWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v2 API. - */ - private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("inputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of OutputFormat in v2 API. - */ - private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("outputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v1 API. - */ - private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - super.configure(job); - - flags.put("inputFormatWasConfigured", true); - } - } - - /** - * Custom implementation of OutputFormat in v1 API. - */ - private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("outputFormatWasConfigured", true); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java deleted file mode 100644 index dd12935..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; - -/** - * Test of error resiliency after an error in a map-reduce job execution. - * Combinations tested: - * { new ALI, old API } - * x { unchecked exception, checked exception, error } - * x { phase where the error happens }. - */ -public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest { - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError0_Runtime() throws Exception { - doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime); - } - - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError0_IOException() throws Exception { - doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException); - } - - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError0_Error() throws Exception { - doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error); - } - - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError7_Runtime() throws Exception { - doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime); - } - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError7_IOException() throws Exception { - doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException); - } - /** - * Tests recovery. - * - * @throws Exception If failed. - */ - public void testRecoveryAfterAnError7_Error() throws Exception { - doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 10 * 60 * 1000L; - } - - /** - * Tests correct work after an error. - * - * @throws Exception On error. - */ - private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception { - try { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow); - - boolean useNewMapper = (useNewBits & 1) == 0; - boolean useNewCombiner = (useNewBits & 2) == 0; - boolean useNewReducer = (useNewBits & 4) == 0; - - for (int i = 0; i < 12; i++) { - int bits = 1 << i; - - System.out.println("############################ Simulator kind = " + simulatorKind - + ", Stage bits = " + bits); - - HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits); - - doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer); - } - } catch (Throwable t) { - t.printStackTrace(); - - fail("Unexpected throwable: " + t); - } - } - - /** - * Performs test with given error simulator. - * - * @param sim The simulator. - * @param inFile Input file. - * @param useNewMapper If the use new mapper API. - * @param useNewCombiner If to use new combiner. - * @param useNewReducer If to use new reducer API. - * @throws Exception If failed. - */ - private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper, - boolean useNewCombiner, boolean useNewReducer) throws Exception { - // Set real simulating error simulator: - assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim)); - - try { - // Expect failure there: - doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); - } - catch (Throwable t) { // This may be an Error. - // Expected: - System.out.println(t.toString()); // Ignore, continue the test. - } - - // Set no-op error simulator: - assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance)); - - // Expect success there: - doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java deleted file mode 100644 index b703896..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; - -/** - * Test of whole cycle of map-reduce processing via Job tracker. - */ -public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { - /** - * Tests whole job execution with all phases in all combination of new and old versions of API. - * @throws Exception If fails. - */ - public void testWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); - - for (boolean[] apiMode: getApiModes()) { - assert apiMode.length == 3; - - boolean useNewMapper = apiMode[0]; - boolean useNewCombiner = apiMode[1]; - boolean useNewReducer = apiMode[2]; - - doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); - } - } - - /** - * Gets API mode combinations to be tested. - * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. - * - * @return Arrays of booleans indicating API combinations to test. - */ - protected boolean[][] getApiModes() { - return new boolean[][] { - { false, false, false }, - { false, false, true }, - { false, true, false }, - { true, false, false }, - { true, true, true }, - }; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java deleted file mode 100644 index 0c172c3..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopNoHadoopMapReduceTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * Test attempt to execute a map-reduce task while no Hadoop processor available. - */ -public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.setHadoopConfiguration(null); - c.setPeerClassLoadingEnabled(true); - - return c; - } - - /** {@inheritDoc} */ - @Override public void testWholeMapReduceExecution() throws Exception { - try { - super.testWholeMapReduceExecution(); - - fail("IllegalStateException expected."); - } - catch (IllegalStateException ignore) { - // No-op. - } - } -}