Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B1391064A for ; Thu, 5 Mar 2015 09:05:57 +0000 (UTC) Received: (qmail 43320 invoked by uid 500); 5 Mar 2015 09:05:35 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 43282 invoked by uid 500); 5 Mar 2015 09:05:35 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 43265 invoked by uid 99); 5 Mar 2015 09:05:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:05:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 05 Mar 2015 09:05:21 +0000 Received: (qmail 42418 invoked by uid 99); 5 Mar 2015 09:05:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:05:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 614C1E0A3F; Thu, 5 Mar 2015 09:05:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 05 Mar 2015 09:05:04 -0000 Message-Id: In-Reply-To: <56d156eb01174f0b88f954783fd4b143@git.apache.org> References: <56d156eb01174f0b88f954783fd4b143@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java new file mode 100644 index 0000000..76988a3 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -0,0 +1,1006 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.hadoop.mapreduce.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.igfs.secondary.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * + */ +public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final UUID ID_1 = new UUID(0, 1); + + /** */ + private static final UUID ID_2 = new UUID(0, 2); + + /** */ + private static final UUID ID_3 = new UUID(0, 3); + + /** */ + private static final String HOST_1 = "host1"; + + /** */ + private static final String HOST_2 = "host2"; + + /** */ + private static final String HOST_3 = "host3"; + + /** */ + private static final String INVALID_HOST_1 = "invalid_host1"; + + /** */ + private static final String INVALID_HOST_2 = "invalid_host2"; + + /** */ + private static final String INVALID_HOST_3 = "invalid_host3"; + + /** Mocked Grid. */ + private static final MockIgnite GRID = new MockIgnite(); + + /** Mocked IGFS. */ + private static final IgniteFileSystem IGFS = new MockIgfs(); + + /** Planner. */ + private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); + + /** Block locations. */ + private static final Map> BLOCK_MAP = new HashMap<>(); + + /** Proxy map. */ + private static final Map PROXY_MAP = new HashMap<>(); + + /** Last created plan. */ + private static final ThreadLocal PLAN = new ThreadLocal<>(); + + /** + * + */ + static { + GridTestUtils.setFieldValue(PLANNER, "ignite", GRID); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + GridTestUtils.setFieldValue(PLANNER, "log", log()); + + BLOCK_MAP.clear(); + PROXY_MAP.clear(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsOneBlockPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split2); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_1); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOrphans() throws IgniteCheckedException { + HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); + + plan(1, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * Create plan. + * + * @param reducers Reducers count. + * @param splits Splits. + * @return Plan. + * @throws IgniteCheckedException If failed. + */ + private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { + assert reducers > 0; + assert splits != null && splits.length > 0; + + Collection splitList = new ArrayList<>(splits.length); + + Collections.addAll(splitList, splits); + + Collection top = new ArrayList<>(); + + GridTestNode node1 = new GridTestNode(ID_1); + GridTestNode node2 = new GridTestNode(ID_2); + GridTestNode node3 = new GridTestNode(ID_3); + + node1.setHostName(HOST_1); + node2.setHostName(HOST_2); + node3.setHostName(HOST_3); + + top.add(node1); + top.add(node2); + top.add(node3); + + HadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); + + PLAN.set(plan); + + return plan; + } + + /** + * Ensure that node contains the given mappers. + * + * @param nodeId Node ID. + * @param expSplits Expected splits. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) { + Collection expSplitsCol = new ArrayList<>(); + + Collections.addAll(expSplitsCol, expSplits); + + Collection splits = PLAN.get().mappers(nodeId); + + return F.eq(expSplitsCol, splits); + } + + /** + * Ensure that node contains the given amount of reducers. + * + * @param nodeId Node ID. + * @param reducers Reducers. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureReducers(UUID nodeId, int reducers) { + int[] reducersArr = PLAN.get().reducers(nodeId); + + return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); + } + + /** + * Ensure that no mappers and reducers is located on this node. + * + * @param nodeId Node ID. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureEmpty(UUID nodeId) { + return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); + } + + /** + * Create split. + * + * @param igfs IGFS flag. + * @param file File. + * @param start Start. + * @param len Length. + * @param hosts Hosts. + * @return Split. + */ + private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { + URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); + + return new HadoopFileBlock(hosts, uri, start, len); + } + + /** + * Create block location. + * + * @param start Start. + * @param len Length. + * @param nodeIds Node IDs. + * @return Block location. + */ + private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { + assert nodeIds != null && nodeIds.length > 0; + + Collection nodes = new ArrayList<>(nodeIds.length); + + for (UUID id : nodeIds) + nodes.add(new GridTestNode(id)); + + return new IgfsBlockLocationImpl(start, len, nodes); + } + + /** + * Map IGFS block to nodes. + * + * @param file File. + * @param start Start. + * @param len Length. + * @param locations Locations. + */ + private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { + assert locations != null && locations.length > 0; + + IgfsPath path = new IgfsPath(file); + + Block block = new Block(path, start, len); + + Collection locationsList = new ArrayList<>(); + + Collections.addAll(locationsList, locations); + + BLOCK_MAP.put(block, locationsList); + } + + /** + * Block. + */ + private static class Block { + /** */ + private final IgfsPath path; + + /** */ + private final long start; + + /** */ + private final long len; + + /** + * Constructor. + * + * @param path Path. + * @param start Start. + * @param len Length. + */ + private Block(IgfsPath path, long start, long len) { + this.path = path; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("RedundantIfStatement") + @Override public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Block)) return false; + + Block block = (Block) o; + + if (len != block.len) + return false; + + if (start != block.start) + return false; + + if (!path.equals(block.path)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = path.hashCode(); + + res = 31 * res + (int) (start ^ (start >>> 32)); + res = 31 * res + (int) (len ^ (len >>> 32)); + + return res; + } + } + + /** + * Mocked job. + */ + private static class MockJob implements HadoopJob { + /** Reducers count. */ + private final int reducers; + + /** */ + private Collection splitList; + + /** + * Constructor. + * + * @param reducers Reducers count. + * @param splitList Splits. + */ + private MockJob(int reducers, Collection splitList) { + this.reducers = reducers; + this.splitList = splitList; + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return new HadoopDefaultJobInfo() { + @Override public int reducers() { + return reducers; + } + }; + } + + /** {@inheritDoc} */ + @Override public Collection input() throws IgniteCheckedException { + return splitList; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void dispose(boolean external) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + // No-op. + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs implements IgfsEx { + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); + } + + /** {@inheritDoc} */ + @Override public Collection affinity(IgfsPath path, long start, long len) { + return BLOCK_MAP.get(new Block(path, start, len)); + } + + /** {@inheritDoc} */ + @Override public Collection affinity(IgfsPath path, long start, long len, + long maxLen) { + return null; + } + + /** {@inheritDoc} */ + @Override public void stop() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgfsContext context() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPaths proxyPaths() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsStatus globalSpace() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean globalSampling() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsLocalMetrics localMetrics() { + return null; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture awaitDeletesAsync() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String clientLogDirectory() { + return null; + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgfsPath path, boolean primary) { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return null; + } + + /** {@inheritDoc} */ + @Override public FileSystemConfiguration configuration() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile info(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary summary(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map props) { + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + return false; + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map props) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection listPaths(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection listFiles(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map props) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, + @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map props) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, boolean create) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map props) { + return null; + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgfsMetrics metrics() { + return null; + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long size(IgfsPath path) { + return 0; + } + + /** {@inheritDoc} */ + @Override public void format() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public R execute(IgfsTask task, @Nullable IgfsRecordResolver rslvr, + Collection paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public R execute(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public R execute(Class> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem withAsync() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture future() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystem asSecondary() { + return null; + } + } + + /** + * Mocked Grid. + */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + private static class MockIgnite extends IgniteSpringBean implements IgniteEx { + /** {@inheritDoc} */ + @Override public IgniteClusterEx cluster() { + return (IgniteClusterEx)super.cluster(); + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem igfsx(String name) { + assert F.eq("igfs", name); + + return IGFS; + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + return null; + } + + /** {@inheritDoc} */ + @Override public String name() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjectionEx utilityCache(Class keyCls, + Class valCls) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCache cachex(@Nullable String name) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCache cachex() { + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Collection> cachesx(@Nullable IgnitePredicate>... p) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean eventUserRecordable(int type) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean allEventsUserRecordable(int[] types) { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection compatibleVersions() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isJmxRemoteEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isRestartEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return null; + } + + /** {@inheritDoc} */ + @Override public String latestVersion() { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java new file mode 100644 index 0000000..8cf31a2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.net.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test file systems for the working directory multi-threading support. + */ +public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + private static final int THREAD_COUNT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + + /** + * Test the file system with specified URI for the multi-thread working directory support. + * + * @param uri Base URI of the file system (scheme and authority). + * @throws Exception If fails. + */ + private void testFileSystem(final URI uri) throws Exception { + final Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, + new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); + + final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); + + final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; + final Path[] newWorkDir = new Path[THREAD_COUNT]; + final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; + final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; + + final AtomicInteger threadNum = new AtomicInteger(0); + + GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + int curThreadNum = threadNum.getAndIncrement(); + + FileSystem fs = FileSystem.get(uri, cfg); + + HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); + + if ("file".equals(uri.getScheme())) + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); + + changeUserPhase.countDown(); + changeUserPhase.await(); + + newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); + + changeDirPhase.countDown(); + changeDirPhase.await(); + + newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); + + changeAbsDirPhase.countDown(); + changeAbsDirPhase.await(); + + newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); + + finishPhase.countDown(); + } + catch (InterruptedException | IOException e) { + error("Failed to execute test thread.", e); + + fail(); + } + } + }, THREAD_COUNT, "filesystems-test"); + + finishPhase.await(); + + for (int i = 0; i < THREAD_COUNT; i ++) { + cfg.set(MRJobConfig.USER_NAME, "user" + i); + + Path workDir = new Path(new Path(uri), "user/user" + i); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); + + assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); + + assertEquals(workDir, newUserInitWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); + + assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); + } + + System.out.println(System.getProperty("user.dir")); + } + + /** + * Test IGFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testIgfs() throws Exception { + testFileSystem(URI.create(igfsScheme())); + } + + /** + * Test HDFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testHdfs() throws Exception { + testFileSystem(URI.create("hdfs://localhost/")); + } + + /** + * Test LocalFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testLocal() throws Exception { + testFileSystem(URI.create("file:///")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java new file mode 100644 index 0000000..e385ca7 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Grouping test. + */ +public class HadoopGroupingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** */ + private static final GridConcurrentHashSet vals = HadoopSharedMap.map(HadoopGroupingTest.class) + .put("vals", new GridConcurrentHashSet()); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + protected boolean igfsEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGroupingReducer() throws Exception { + doTestGrouping(false); + } + + /** + * @throws Exception If failed. + */ + public void testGroupingCombiner() throws Exception { + doTestGrouping(true); + } + + /** + * @param combiner With combiner. + * @throws Exception If failed. + */ + public void doTestGrouping(boolean combiner) throws Exception { + vals.clear(); + + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + job.setOutputFormatClass(OutFormat.class); + + job.setOutputKeyClass(YearTemperature.class); + job.setOutputValueClass(Text.class); + + job.setMapperClass(Mapper.class); + + if (combiner) { + job.setCombinerClass(MyReducer.class); + job.setNumReduceTasks(0); + job.setCombinerKeyGroupingComparatorClass(YearComparator.class); + } + else { + job.setReducerClass(MyReducer.class); + job.setNumReduceTasks(4); + job.setGroupingComparatorClass(YearComparator.class); + } + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(30000); + + assertTrue(vals.isEmpty()); + } + + public static class MyReducer extends Reducer { + /** */ + int lastYear; + + @Override protected void reduce(YearTemperature key, Iterable vals0, Context context) + throws IOException, InterruptedException { + X.println("___ : " + context.getTaskAttemptID() + " --> " + key); + + Set ids = new HashSet<>(); + + for (Text val : vals0) + assertTrue(ids.add(UUID.fromString(val.toString()))); + + for (Text val : vals0) + assertTrue(ids.remove(UUID.fromString(val.toString()))); + + assertTrue(ids.isEmpty()); + + assertTrue(key.year > lastYear); + + lastYear = key.year; + + for (Text val : vals0) + assertTrue(vals.remove(UUID.fromString(val.toString()))); + } + } + + public static class YearComparator implements RawComparator { // Grouping comparator. + /** {@inheritDoc} */ + @Override public int compare(YearTemperature o1, YearTemperature o2) { + return Integer.compare(o1.year, o2.year); + } + + /** {@inheritDoc} */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + throw new IllegalStateException(); + } + } + + public static class YearTemperature implements WritableComparable, Cloneable { + /** */ + private int year; + + /** */ + private int temperature; + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(year); + out.writeInt(temperature); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + year = in.readInt(); + temperature = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { // To be partitioned by year. + return year; + } + + /** {@inheritDoc} */ + @Override public int compareTo(YearTemperature o) { + int res = Integer.compare(year, o.year); + + if (res != 0) + return res; + + // Sort comparator by year and temperature, to find max for year. + return Integer.compare(o.temperature, temperature); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(YearTemperature.class, this); + } + } + + public static class InFormat extends InputFormat { + /** {@inheritDoc} */ + @Override public List getSplits(JobContext context) throws IOException, InterruptedException { + ArrayList list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) + list.add(new HadoopSortingTest.FakeSplit(20)); + + return list; + } + + /** {@inheritDoc} */ + @Override public RecordReader createRecordReader(final InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new RecordReader() { + /** */ + int cnt; + + /** */ + Random rnd = new GridRandom(); + + /** */ + YearTemperature key = new YearTemperature(); + + /** */ + Text val = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return cnt++ < split.getLength(); + } + + @Override public YearTemperature getCurrentKey() { + key.year = 1990 + rnd.nextInt(10); + key.temperature = 10 + rnd.nextInt(20); + + return key; + } + + @Override public Text getCurrentValue() { + UUID id = UUID.randomUUID(); + + assertTrue(vals.add(id)); + + val.set(id.toString()); + + return val; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + // No-op. + } + }; + } + } + + /** + * + */ + public static class OutFormat extends OutputFormat { + /** {@inheritDoc} */ + @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java new file mode 100644 index 0000000..943d89f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Job tracker self test. + */ +public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** Test block count parameter name. */ + private static final int BLOCK_CNT = 10; + + /** */ + private static HadoopSharedMap m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class); + + /** Map task execution count. */ + private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); + + /** */ + private static final Map latch = m.put("latch", new HashMap()); + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch.put("mapAwaitLatch", new CountDownLatch(1)); + latch.put("reduceAwaitLatch", new CountDownLatch(1)); + latch.put("combineAwaitLatch", new CountDownLatch(1)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + mapExecCnt.set(0); + combineExecCnt.set(0); + reduceExecCnt.set(0); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner()); + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(0, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskWithCombinerPerMap() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setCombinerClass(TestCombiner.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + // All maps are completed. We have a combiner, so no reducers should be executed + // before combiner latch is released. + + U.sleep(50); + + assertEquals(0, reduceExecCnt.get()); + + info("Releasing combiner latch."); + + latch.get("combineAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(10, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * Checks job execution status. + * + * @param jobId Job ID. + * @param complete Completion status. + * @throws Exception If failed. + */ + private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteKernal kernal = (IgniteKernal)grid(i); + + Hadoop hadoop = kernal.hadoop(); + + HadoopJobStatus stat = hadoop.status(jobId); + + assert stat != null; + + IgniteInternalFuture fut = hadoop.finishFuture(jobId); + + if (!complete) + assertFalse(fut.isDone()); + else { + info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + + kernal.getLocalNodeId() + ']'); + + fut.get(); + } + } + } + + /** + * Test input format + */ + public static class InFormat extends InputFormat { + + @Override public List getSplits(JobContext ctx) throws IOException, InterruptedException { + List res = new ArrayList<>(BLOCK_CNT); + + for (int i = 0; i < BLOCK_CNT; i++) + try { + res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); + } + catch (URISyntaxException e) { + throw new IOException(e); + } + + return res; + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader() { + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + } + + @Override public boolean nextKeyValue() { + return false; + } + + @Override public Object getCurrentKey() { + return null; + } + + @Override public Object getCurrentValue() { + return null; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + + } + }; + } + } + + /** + * Test mapper. + */ + private static class TestMapper extends Mapper { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("mapAwaitLatch").await(); + + mapExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test reducer. + */ + private static class TestReducer extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("reduceAwaitLatch").await(); + + reduceExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test combiner. + */ + private static class TestCombiner extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("combineAwaitLatch").await(); + + combineExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java new file mode 100644 index 0000000..4a6e1ef --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests map-reduce execution with embedded mode. + */ +public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { + /** */ + private static Map flags = HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) + .put("flags", new HashMap()); + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * Tests whole job execution with all phases in old and new versions of API with definition of custom + * Serialization, Partitioner and IO formats. + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, + "key6", 18000 ); + + for (int i = 0; i < 2; i++) { + boolean useNewAPI = i == 1; + + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + flags.put("serializationWasConfigured", false); + flags.put("partitionerWasConfigured", false); + flags.put("inputFormatWasConfigured", false); + flags.put("outputFormatWasConfigured", false); + + JobConf jobConf = new JobConf(); + + jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + //To split into about 6-7 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); + + if (!useNewAPI) { + jobConf.setPartitionerClass(CustomV1Partitioner.class); + jobConf.setInputFormat(CustomV1InputFormat.class); + jobConf.setOutputFormat(CustomV1OutputFormat.class); + } + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); + + if (useNewAPI) { + job.setPartitionerClass(CustomV2Partitioner.class); + job.setInputFormatClass(CustomV2InputFormat.class); + job.setOutputFormatClass(CustomV2OutputFormat.class); + } + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setNumReduceTasks(3); + + job.setJarByClass(HadoopWordCount2.class); + + IgniteInternalFuture fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertTrue("Serialization was configured (new API is " + useNewAPI + ")", + flags.get("serializationWasConfigured")); + + assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", + flags.get("partitionerWasConfigured")); + + assertTrue("Input format was configured (new API is = " + useNewAPI + ")", + flags.get("inputFormatWasConfigured")); + + assertTrue("Output format was configured (new API is = " + useNewAPI + ")", + flags.get("outputFormatWasConfigured")); + + assertEquals("Use new API = " + useNewAPI, + "key3\t15000\n" + + "key6\t18000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") + ); + + assertEquals("Use new API = " + useNewAPI, + "key1\t10000\n" + + "key4\t7000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") + ); + + assertEquals("Use new API = " + useNewAPI, + "key2\t20000\n" + + "key5\t12000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") + ); + + } + } + + /** + * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. + */ + protected static class CustomSerialization extends WritableSerialization { + @Override public void setConf(Configuration conf) { + super.setConf(conf); + + flags.put("serializationWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v1 API. + */ + private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("partitionerWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v2 API. + */ + private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner + implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("partitionerWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v2 API. + */ + private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("inputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of OutputFormat in v2 API. + */ + private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("outputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v1 API. + */ + private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + flags.put("inputFormatWasConfigured", true); + } + } + + /** + * Custom implementation of OutputFormat in v1 API. + */ + private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("outputFormatWasConfigured", true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java new file mode 100644 index 0000000..6242ecc --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Tests whole job execution with all phases in all combination of new and old versions of API. + * @throws Exception If fails. + */ + public void testWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); + + for (int i = 0; i < 8; i++) { + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + boolean useNewMapper = (i & 1) == 0; + boolean useNewCombiner = (i & 2) == 0; + boolean useNewReducer = (i & 4) == 0; + + JobConf jobConf = new JobConf(); + + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); + jobConf.setUser("yyy"); + jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); + + //To split into about 40 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setJarByClass(HadoopWordCount2.class); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + checkJobStatistics(jobId); + + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + + useNewReducer, + "blue\t200000\n" + + "green\t150000\n" + + "red\t100000\n" + + "yellow\t70000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") + ); + } + } + + /** + * Simple test job statistics. + * + * @param jobId Job id. + * @throws IgniteCheckedException + */ + private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { + HadoopCounters cntrs = grid(0).hadoop().counters(jobId); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + Map> tasks = new TreeMap<>(); + + Map phaseOrders = new HashMap<>(); + phaseOrders.put("submit", 0); + phaseOrders.put("prepare", 1); + phaseOrders.put("start", 2); + phaseOrders.put("Cstart", 3); + phaseOrders.put("finish", 4); + + String prevTaskId = null; + + long apiEvtCnt = 0; + + for (T2 evt : perfCntr.evts()) { + //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 + String[] parsedEvt = evt.get1().split(" "); + + String taskId; + String taskPhase; + + if ("JOB".equals(parsedEvt[0])) { + taskId = parsedEvt[0]; + taskPhase = parsedEvt[1]; + } + else { + taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; + taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; + } + + if (!taskId.equals(prevTaskId)) + tasks.put(taskId, new TreeMap()); + + Integer pos = phaseOrders.get(taskPhase); + + assertNotNull("Invalid phase " + taskPhase, pos); + + tasks.get(taskId).put(pos, evt.get2()); + + prevTaskId = taskId; + + apiEvtCnt++; + } + + for (Map.Entry> task : tasks.entrySet()) { + Map order = task.getValue(); + + long prev = 0; + + for (Map.Entry phase : order.entrySet()) { + assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); + + prev = phase.getValue(); + } + } + + final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return igfs.exists(statPath); + } + }, 10000); + + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + + assertEquals(apiEvtCnt, HadoopTestUtils.simpleCheckJobStatFile(reader)); + } +}