Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8A516200B95 for ; Tue, 27 Sep 2016 17:25:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 89087160AEF; Tue, 27 Sep 2016 15:25:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0020C160AFF for ; Tue, 27 Sep 2016 17:25:54 +0200 (CEST) Received: (qmail 40350 invoked by uid 500); 27 Sep 2016 15:25:54 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38783 invoked by uid 99); 27 Sep 2016 15:25:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Sep 2016 15:25:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 81AA3EEE18; Tue, 27 Sep 2016 15:25:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 27 Sep 2016 15:26:16 -0000 Message-Id: <99d5b518b8ea447a8bcbaef7f011313a@git.apache.org> In-Reply-To: <42757ba38ed9450492692d0865d228cf@git.apache.org> References: <42757ba38ed9450492692d0865d228cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode. archived-at: Tue, 27 Sep 2016 15:25:57 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java deleted file mode 100644 index 214c2a8..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; - -/** - * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. - */ -public class IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest() { - super(PRIMARY, false); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java deleted file mode 100644 index d7f34a1..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.PROXY; - -/** - * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. - */ -public class IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest() { - super(PROXY, false); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java deleted file mode 100644 index 0435eaa..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; - -/** - * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode. - */ -public class IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest() { - super(DUAL_ASYNC, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java deleted file mode 100644 index 3af7274..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; - -/** - * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode. - */ -public class IgniteHadoopFileSystemShmemExternalDualSyncSelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemExternalDualSyncSelfTest() { - super(DUAL_SYNC, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java deleted file mode 100644 index ce9dbd9..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; - -/** - * IGFS Hadoop file system IPC shmem self test in PRIMARY mode. - */ -public class IgniteHadoopFileSystemShmemExternalPrimarySelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemExternalPrimarySelfTest() { - super(PRIMARY, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java deleted file mode 100644 index bc8c182..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.PROXY; - -/** - * IGFS Hadoop file system IPC shmem self test in SECONDARY mode. - */ -public class IgniteHadoopFileSystemShmemExternalSecondarySelfTest - extends IgniteHadoopFileSystemShmemAbstractSelfTest { - /** - * Constructor. - */ - public IgniteHadoopFileSystemShmemExternalSecondarySelfTest() { - super(PROXY, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java deleted file mode 100644 index 3731213..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Abstract test of whole cycle of map-reduce processing via Job tracker. - */ -public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { - /** IGFS block size. */ - protected static final int IGFS_BLOCK_SIZE = 512 * 1024; - - /** Amount of blocks to prefetch. */ - protected static final int PREFETCH_BLOCKS = 1; - - /** Amount of sequential block reads before prefetch is triggered. */ - protected static final int SEQ_READS_BEFORE_PREFETCH = 2; - - /** Secondary file system URI. */ - protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; - - /** Secondary file system configuration path. */ - protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; - - /** The user to run Hadoop job on behalf of. */ - protected static final String USER = "vasya"; - - /** Secondary IGFS name. */ - protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; - - /** Red constant. */ - protected static final int red = 10_000; - - /** Blue constant. */ - protected static final int blue = 20_000; - - /** Green constant. */ - protected static final int green = 15_000; - - /** Yellow constant. */ - protected static final int yellow = 7_000; - - /** The secondary Ignite node. */ - protected Ignite igniteSecondary; - - /** The secondary Fs. */ - protected IgfsSecondaryFileSystem secondaryFs; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * Gets owner of a IgfsEx path. - * @param p The path. - * @return The owner. - */ - private static String getOwner(final IgfsEx i, final IgfsPath p) { - return IgfsUserContext.doAs(USER, new IgniteOutClosure() { - @Override public String apply() { - IgfsFile f = i.info(p); - - assert f != null; - - return f.property(IgfsUtils.PROP_USER_NAME); - } - }); - } - - /** - * Gets owner of a secondary Fs path. - * @param secFs The sec Fs. - * @param p The path. - * @return The owner. - */ - private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { - return IgfsUserContext.doAs(USER, new IgniteOutClosure() { - @Override public String apply() { - return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); - } - }); - } - - /** - * Checks owner of the path. - * @param p The path. - */ - private void checkOwner(IgfsPath p) { - String ownerPrim = getOwner(igfs, p); - assertEquals(USER, ownerPrim); - - String ownerSec = getOwnerSecondary(secondaryFs, p); - assertEquals(USER, ownerSec); - } - - /** - * Does actual test job - * - * @param useNewMapper flag to use new mapper API. - * @param useNewCombiner flag to use new combiner API. - * @param useNewReducer flag to use new reducer API. - */ - protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer) - throws Exception { - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - JobConf jobConf = new JobConf(); - - jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser(USER); - jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); - - //To split into about 40 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); - - Job job = Job.getInstance(jobConf); - - HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setJarByClass(HadoopWordCount2.class); - - HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - - IgniteInternalFuture fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - checkJobStatistics(jobId); - - final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; - - checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); - - checkOwner(new IgfsPath(outFile)); - - String actual = readAndSortFile(outFile, job.getConfiguration()); - - assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, - "blue\t" + blue + "\n" + - "green\t" + green + "\n" + - "red\t" + red + "\n" + - "yellow\t" + yellow + "\n", - actual - ); - } - - /** - * Gets if to compress output data with Snappy. - * - * @return If to compress output data with Snappy. - */ - protected boolean compressOutputSnappy() { - return false; - } - - /** - * Simple test job statistics. - * - * @param jobId Job id. - * @throws IgniteCheckedException - */ - private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { - HadoopCounters cntrs = grid(0).hadoop().counters(jobId); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - Map> tasks = new TreeMap<>(); - - Map phaseOrders = new HashMap<>(); - phaseOrders.put("submit", 0); - phaseOrders.put("prepare", 1); - phaseOrders.put("start", 2); - phaseOrders.put("Cstart", 3); - phaseOrders.put("finish", 4); - - String prevTaskId = null; - - long apiEvtCnt = 0; - - for (T2 evt : perfCntr.evts()) { - //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 - String[] parsedEvt = evt.get1().split(" "); - - String taskId; - String taskPhase; - - if ("JOB".equals(parsedEvt[0])) { - taskId = parsedEvt[0]; - taskPhase = parsedEvt[1]; - } - else { - taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; - taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; - } - - if (!taskId.equals(prevTaskId)) - tasks.put(taskId, new TreeMap()); - - Integer pos = phaseOrders.get(taskPhase); - - assertNotNull("Invalid phase " + taskPhase, pos); - - tasks.get(taskId).put(pos, evt.get2()); - - prevTaskId = taskId; - - apiEvtCnt++; - } - - for (Map.Entry> task : tasks.entrySet()) { - Map order = task.getValue(); - - long prev = 0; - - for (Map.Entry phase : order.entrySet()) { - assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); - - prev = phase.getValue(); - } - } - - final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return igfs.exists(statPath); - } - }, 20_000); - - final long apiEvtCnt0 = apiEvtCnt; - - boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { - return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }, 10000); - - if (!res) { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); - - assert false : "Invalid API events count [exp=" + apiEvtCnt0 + - ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; - } - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); - - super.beforeTest(); - } - - /** - * Start grid with IGFS. - * - * @param gridName Grid name. - * @param igfsName IGFS name - * @param mode IGFS mode. - * @param secondaryFs Secondary file system (optional). - * @param restCfg Rest configuration string (optional). - * @return Started grid instance. - * @throws Exception If failed. - */ - protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("dataCache"); - igfsCfg.setMetaCacheName("metaCache"); - igfsCfg.setName(igfsName); - igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); - igfsCfg.setDefaultMode(mode); - igfsCfg.setIpcEndpointConfiguration(restCfg); - igfsCfg.setSecondaryFileSystem(secondaryFs); - igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); - igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setNearConfiguration(null); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setOffHeapMaxMemory(0); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - HadoopConfiguration hadoopCfg = createHadoopConfiguration(); - - if (hadoopCfg != null) - cfg.setHadoopConfiguration(hadoopCfg); - - return G.start(cfg); - } - - /** - * Creates custom Hadoop configuration. - * - * @return The Hadoop configuration. - */ - protected HadoopConfiguration createHadoopConfiguration() { - return null; - } - - /** {@inheritDoc} */ - @Override public FileSystemConfiguration igfsConfiguration() throws Exception { - FileSystemConfiguration fsCfg = super.igfsConfiguration(); - - secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); - - fsCfg.setSecondaryFileSystem(secondaryFs); - - return fsCfg; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java deleted file mode 100644 index fb16988..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.File; -import org.apache.hadoop.conf.Configuration; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.ConnectorConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.igfs.IgfsIpcEndpointType; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * Abstract class for Hadoop tests. - */ -public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** REST port. */ - protected static final int REST_PORT = 11212; - - /** IGFS name. */ - protected static final String igfsName = null; - - /** IGFS name. */ - protected static final String igfsMetaCacheName = "meta"; - - /** IGFS name. */ - protected static final String igfsDataCacheName = "data"; - - /** IGFS block size. */ - protected static final int igfsBlockSize = 1024; - - /** IGFS block group size. */ - protected static final int igfsBlockGroupSize = 8; - - /** Initial REST port. */ - private int restPort = REST_PORT; - - /** Secondary file system REST endpoint configuration. */ - protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; - - static { - SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); - - SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); - SECONDARY_REST_CFG.setPort(11500); - } - - - /** Initial classpath. */ - private static String initCp; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // Add surefire classpath to regular classpath. - initCp = System.getProperty("java.class.path"); - - String surefireCp = System.getProperty("surefire.test.class.path"); - - if (surefireCp != null) - System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - // Restore classpath. - System.setProperty("java.class.path", initCp); - - initCp = null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - if (igfsEnabled()) { - cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); - - cfg.setFileSystemConfiguration(igfsConfiguration()); - } - - if (restEnabled()) { - ConnectorConfiguration clnCfg = new ConnectorConfiguration(); - - clnCfg.setPort(restPort++); - - cfg.setConnectorConfiguration(clnCfg); - } - - cfg.setLocalHost("127.0.0.1"); - cfg.setPeerClassLoadingEnabled(false); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Hadoop configuration. - */ - public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = new HadoopConfiguration(); - - cfg.setMaxParallelTasks(3); - - return cfg; - } - - /** - * @return IGFS configuration. - */ - public FileSystemConfiguration igfsConfiguration() throws Exception { - FileSystemConfiguration cfg = new FileSystemConfiguration(); - - cfg.setName(igfsName); - cfg.setBlockSize(igfsBlockSize); - cfg.setDataCacheName(igfsDataCacheName); - cfg.setMetaCacheName(igfsMetaCacheName); - cfg.setFragmentizerEnabled(false); - - return cfg; - } - - /** - * @return IGFS meta cache configuration. - */ - public CacheConfiguration metaCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsMetaCacheName); - cfg.setCacheMode(REPLICATED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return IGFS data cache configuration. - */ - private CacheConfiguration dataCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsDataCacheName); - cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return {@code True} if IGFS is enabled on Hadoop nodes. - */ - protected boolean igfsEnabled() { - return false; - } - - /** - * @return {@code True} if REST is enabled on Hadoop nodes. - */ - protected boolean restEnabled() { - return false; - } - - /** - * @return Number of nodes to start. - */ - protected int gridCount() { - return 3; - } - - /** - * @param cfg Config. - */ - protected void setupFileSystems(Configuration cfg) { - cfg.set("fs.defaultFS", igfsScheme()); - cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); - cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem. - class.getName()); - - HadoopFileSystemsUtils.setupFileSystems(cfg); - } - - /** - * @return IGFS scheme for test. - */ - protected String igfsScheme() { - return "igfs://:" + getTestGridName(0) + "@/"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java deleted file mode 100644 index e45c127..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.Joiner; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.igfs.IgfsEx; - -/** - * Abstract class for tests based on WordCount test job. - */ -public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest { - /** Input path. */ - protected static final String PATH_INPUT = "/input"; - - /** Output path. */ - protected static final String PATH_OUTPUT = "/output"; - - /** IGFS instance. */ - protected IgfsEx igfs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - // Init cache by correct LocalFileSystem implementation - FileSystem.getLocal(cfg); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** - * Generates test file. - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - protected void generateTestFile(String path, Object... wordCounts) throws Exception { - List wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Input file preparing - PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); - - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - testInputFileWriter.println(Joiner.on(' ').join(subList)); - } - - testInputFileWriter.close(); - } - - /** - * Read w/o decoding (default). - * - * @param fileName The file. - * @return The file contents, human-readable. - * @throws Exception On error. - */ - protected String readAndSortFile(String fileName) throws Exception { - return readAndSortFile(fileName, null); - } - - /** - * Reads whole text file into String. - * - * @param fileName Name of the file to read. - * @return Content of the file as String value. - * @throws Exception If could not read the file. - */ - protected String readAndSortFile(String fileName, Configuration conf) throws Exception { - final List list = new ArrayList<>(); - - final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false); - - if (snappyDecode) { - try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(new Path(fileName)))) { - Text key = new Text(); - - IntWritable val = new IntWritable(); - - while (reader.next(key, val)) - list.add(key + "\t" + val); - } - } - else { - try (InputStream is0 = igfs.open(new IgfsPath(fileName))) { - BufferedReader reader = new BufferedReader(new InputStreamReader(is0)); - - String line; - - while ((line = reader.readLine()) != null) - list.add(line); - } - } - - Collections.sort(list); - - return Joiner.on('\n').join(list) + "\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java deleted file mode 100644 index 02d98d0..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import javax.security.auth.AuthPermission; -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.ignite.internal.processors.hadoop.deps.CircularWIthHadoop; -import org.apache.ignite.internal.processors.hadoop.deps.CircularWithoutHadoop; -import org.apache.ignite.internal.processors.hadoop.deps.WithIndirectField; -import org.apache.ignite.internal.processors.hadoop.deps.WithCast; -import org.apache.ignite.internal.processors.hadoop.deps.WithClassAnnotation; -import org.apache.ignite.internal.processors.hadoop.deps.WithConstructorInvocation; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodCheckedException; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodRuntimeException; -import org.apache.ignite.internal.processors.hadoop.deps.WithExtends; -import org.apache.ignite.internal.processors.hadoop.deps.WithField; -import org.apache.ignite.internal.processors.hadoop.deps.WithImplements; -import org.apache.ignite.internal.processors.hadoop.deps.WithInitializer; -import org.apache.ignite.internal.processors.hadoop.deps.WithInnerClass; -import org.apache.ignite.internal.processors.hadoop.deps.WithLocalVariable; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodAnnotation; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodInvocation; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodArgument; -import org.apache.ignite.internal.processors.hadoop.deps.WithMethodReturnType; -import org.apache.ignite.internal.processors.hadoop.deps.WithOuterClass; -import org.apache.ignite.internal.processors.hadoop.deps.WithParameterAnnotation; -import org.apache.ignite.internal.processors.hadoop.deps.WithStaticField; -import org.apache.ignite.internal.processors.hadoop.deps.WithStaticInitializer; -import org.apache.ignite.internal.processors.hadoop.deps.Without; - -/** - * Tests for Hadoop classloader. - */ -public class HadoopClassLoaderTest extends TestCase { - /** */ - final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null); - - /** - * @throws Exception If failed. - */ - public void testClassLoading() throws Exception { - assertNotSame(CircularWIthHadoop.class, ldr.loadClass(CircularWIthHadoop.class.getName())); - assertNotSame(CircularWithoutHadoop.class, ldr.loadClass(CircularWithoutHadoop.class.getName())); - - assertSame(Without.class, ldr.loadClass(Without.class.getName())); - } - - /** - * Test dependency search. - */ - public void testDependencySearch() { - // Positive cases: - final Class[] positiveClasses = { - Configuration.class, - HadoopUtils.class, - WithStaticField.class, - WithCast.class, - WithClassAnnotation.class, - WithConstructorInvocation.class, - WithMethodCheckedException.class, - WithMethodRuntimeException.class, - WithExtends.class, - WithField.class, - WithImplements.class, - WithInitializer.class, - WithInnerClass.class, - WithOuterClass.InnerNoHadoop.class, - WithLocalVariable.class, - WithMethodAnnotation.class, - WithMethodInvocation.class, - WithMethodArgument.class, - WithMethodReturnType.class, - WithParameterAnnotation.class, - WithStaticField.class, - WithStaticInitializer.class, - WithIndirectField.class, - CircularWIthHadoop.class, - CircularWithoutHadoop.class, - }; - - for (Class c: positiveClasses) - assertTrue(c.getName(), ldr.hasExternalDependencies(c.getName())); - - // Negative cases: - final Class[] negativeClasses = { - Object.class, - AuthPermission.class, - Without.class, - }; - - for (Class c: negativeClasses) - assertFalse(c.getName(), ldr.hasExternalDependencies(c.getName())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java deleted file mode 100644 index 7ee318a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.Joiner; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileFilter; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; -import org.apache.ignite.igfs.IgfsInputStream; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ConcurrentHashMap8; - -/** - * Test of integration with Hadoop client via command line interface. - */ -public class HadoopCommandLineTest extends GridCommonAbstractTest { - /** IGFS instance. */ - private IgfsEx igfs; - - /** */ - private static final String igfsName = "igfs"; - - /** */ - private static File testWorkDir; - - /** */ - private static String hadoopHome; - - /** */ - private static String hiveHome; - - /** */ - private static File examplesJar; - - /** - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - private void generateTestFile(File path, Object... wordCounts) throws Exception { - List wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Writing file - try (PrintWriter writer = new PrintWriter(path)) { - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - writer.println(Joiner.on(' ').join(subList)); - } - - writer.flush(); - } - } - - /** - * Generates two data files to join its with Hive. - * - * @throws FileNotFoundException If failed. - */ - private void generateHiveTestFiles() throws FileNotFoundException { - try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); - PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { - char sep = '\t'; - - int idB = 0; - int idA = 0; - int v = 1000; - - for (int i = 0; i < 1000; i++) { - writerA.print(idA++); - writerA.print(sep); - writerA.println(idB); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - } - - writerA.flush(); - writerB.flush(); - } - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); - - assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); - - hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); - - assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); - - String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; - - File[] fileList = new File(mapredHome).listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().startsWith("hadoop-mapreduce-examples-") && - pathname.getName().endsWith(".jar"); - } - }); - - assertEquals("Invalid hadoop distribution.", 1, fileList.length); - - examplesJar = fileList[0]; - - testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); - - U.copy(resolveHadoopConfig("core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); - - File srcFile = resolveHadoopConfig("mapred-site.ignite.xml"); - File dstFile = new File(testWorkDir, "mapred-site.xml"); - - try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); - PrintWriter out = new PrintWriter(dstFile)) { - String line; - - while ((line = in.readLine()) != null) { - if (line.startsWith("")) - out.println( - " \n" + - " " + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "\n" + - " " + IgniteHadoopFileSystemCounterWriter.class.getName() + "\n" + - " \n"); - - out.println(line); - } - - out.flush(); - } - - generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); - - generateHiveTestFiles(); - } - - /** - * Resolve Hadoop configuration file. - * - * @param name File name. - * @return Resolve file. - */ - private static File resolveHadoopConfig(String name) { - File path = U.resolveIgnitePath("modules/hadoop/config/" + name); - - return path != null ? path : U.resolveIgnitePath("config/hadoop/" + name); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - U.delete(testWorkDir); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - String cfgPath = "config/hadoop/default-config.xml"; - - IgniteBiTuple tup = IgnitionEx.loadConfiguration(cfgPath); - - IgniteConfiguration cfg = tup.get1(); - - cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes. - - igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** - * Creates the process build with appropriate environment to run Hadoop CLI. - * - * @return Process builder. - */ - private ProcessBuilder createProcessBuilder() { - String sep = ":"; - - String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - - ProcessBuilder res = new ProcessBuilder(); - - res.environment().put("HADOOP_HOME", hadoopHome); - res.environment().put("HADOOP_CLASSPATH", ggClsPath); - res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); - - res.redirectErrorStream(true); - - return res; - } - - /** - * Waits for process exit and prints the its output. - * - * @param proc Process. - * @return Exit code. - * @throws Exception If failed. - */ - private int watchProcess(Process proc) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String line; - - while ((line = reader.readLine()) != null) - log().info(line); - - return proc.waitFor(); - } - - /** - * Executes Hadoop command line tool. - * - * @param args Arguments for Hadoop command line tool. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHadoopCmd(String... args) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List cmd = new ArrayList<>(); - - cmd.add(hadoopHome + "/bin/hadoop"); - cmd.addAll(Arrays.asList(args)); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Executes Hive query. - * - * @param qry Query. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHiveQuery(String qry) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List cmd = new ArrayList<>(); - - procBuilder.command(cmd); - - cmd.add(hiveHome + "/bin/hive"); - - cmd.add("--hiveconf"); - cmd.add("hive.rpc.query.plan=true"); - - cmd.add("--hiveconf"); - cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + - "databaseName=metastore_db;create=true"); - - cmd.add("-e"); - cmd.add(qry); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Tests Hadoop command line integration. - */ - public void testHadoopCommandLine() throws Exception { - assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); - - assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); - - assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); - - assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); - - IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); - - assertTrue(igfs.exists(path)); - - IgfsPath jobStatPath = null; - - for (IgfsPath jobPath : igfs.listPaths(path)) { - assertNull(jobStatPath); - - jobStatPath = jobPath; - } - - File locStatFile = new File(testWorkDir, "performance"); - - assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); - - long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); - - assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. - - assertTrue(igfs.exists(new IgfsPath("/output"))); - - BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); - - List res = new ArrayList<>(); - - String line; - - while ((line = in.readLine()) != null) - res.add(line); - - Collections.sort(res); - - assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); - } - - /** - * Runs query check result. - * - * @param expRes Expected result. - * @param qry Query. - * @throws Exception If failed. - */ - private void checkQuery(String expRes, String qry) throws Exception { - assertEquals(0, executeHiveQuery("drop table if exists result")); - - assertEquals(0, executeHiveQuery( - "create table result " + - "row format delimited fields terminated by ' ' " + - "stored as textfile " + - "location '/result' as " + qry - )); - - IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0")); - - byte[] buf = new byte[(int) in.length()]; - - in.read(buf); - - assertEquals(expRes, new String(buf)); - } - - /** - * Tests Hive integration. - */ - public void testHiveCommandLine() throws Exception { - assertEquals(0, executeHiveQuery( - "create table table_a (" + - "id_a int," + - "id_b int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-a'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); - - assertEquals(0, executeHiveQuery( - "create table table_b (" + - "id_b int," + - "rndv int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-b'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); - - checkQuery( - "0 0\n" + - "1 2\n" + - "2 4\n" + - "3 6\n" + - "4 8\n" + - "5 10\n" + - "6 12\n" + - "7 14\n" + - "8 16\n" + - "9 18\n", - "select * from table_a order by id_a limit 10" - ); - - checkQuery("2000\n", "select count(id_b) from table_b"); - - checkQuery( - "250 500 2002\n" + - "251 502 2006\n" + - "252 504 2010\n" + - "253 506 2014\n" + - "254 508 2018\n" + - "255 510 2022\n" + - "256 512 2026\n" + - "257 514 2030\n" + - "258 516 2034\n" + - "259 518 2038\n", - "select a.id_a, a.id_b, b.rndv" + - " from table_a a" + - " inner join table_b b on a.id_b = b.id_b" + - " where b.rndv > 2000" + - " order by a.id_a limit 10" - ); - - checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); - } -} \ No newline at end of file