Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 66010200BB4 for ; Mon, 26 Sep 2016 14:13:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 64812160AE3; Mon, 26 Sep 2016 12:13:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CF0DF160AF1 for ; Mon, 26 Sep 2016 14:13:16 +0200 (CEST) Received: (qmail 59413 invoked by uid 500); 26 Sep 2016 12:13:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 58990 invoked by uid 99); 26 Sep 2016 12:13:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2016 12:13:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9779DE38F9; Mon, 26 Sep 2016 12:13:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Date: Mon, 26 Sep 2016 12:13:32 -0000 Message-Id: In-Reply-To: <12075e2a79df4fd8bcb41f7ed71166f8@git.apache.org> References: <12075e2a79df4fd8bcb41f7ed71166f8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode. archived-at: Mon, 26 Sep 2016 12:13:20 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java new file mode 100644 index 0000000..a3bf49c --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.hadoop.Hadoop; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; +import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.combineExecCnt; +import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.latch; +import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.mapExecCnt; +import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.reduceExecCnt; + +/** + * Job tracker self test. + */ +public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** Test block count parameter name. */ + private static final int BLOCK_CNT = 10; + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch.put("mapAwaitLatch", new CountDownLatch(1)); + latch.put("reduceAwaitLatch", new CountDownLatch(1)); + latch.put("combineAwaitLatch", new CountDownLatch(1)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + mapExecCnt.set(0); + combineExecCnt.set(0); + reduceExecCnt.set(0); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner()); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(0, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskWithCombinerPerMap() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setCombinerClass(TestCombiner.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); + + HadoopJobId jobId = new HadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + // All maps are completed. We have a combiner, so no reducers should be executed + // before combiner latch is released. + + U.sleep(50); + + assertEquals(0, reduceExecCnt.get()); + + info("Releasing combiner latch."); + + latch.get("combineAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(10, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * Checks job execution status. + * + * @param jobId Job ID. + * @param complete Completion status. + * @throws Exception If failed. + */ + private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteKernal kernal = (IgniteKernal)grid(i); + + Hadoop hadoop = kernal.hadoop(); + + HadoopJobStatus stat = hadoop.status(jobId); + + assert stat != null; + + IgniteInternalFuture fut = hadoop.finishFuture(jobId); + + if (!complete) + assertFalse(fut.isDone()); + else { + info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + + kernal.getLocalNodeId() + ']'); + + fut.get(); + } + } + } + + /** + * Test input format + */ + public static class InFormat extends InputFormat { + + @Override public List getSplits(JobContext ctx) throws IOException, InterruptedException { + List res = new ArrayList<>(BLOCK_CNT); + + for (int i = 0; i < BLOCK_CNT; i++) + try { + res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); + } + catch (URISyntaxException e) { + throw new IOException(e); + } + + return res; + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader() { + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + } + + @Override public boolean nextKeyValue() { + return false; + } + + @Override public Object getCurrentKey() { + return null; + } + + @Override public Object getCurrentValue() { + return null; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + + } + }; + } + } + + /** + * Test mapper. + */ + private static class TestMapper extends Mapper { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("mapAwaitLatch").await(); + + mapExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test reducer. + */ + private static class TestReducer extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("reduceAwaitLatch").await(); + + reduceExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test combiner. + */ + private static class TestCombiner extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("combineAwaitLatch").await(); + + combineExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java new file mode 100644 index 0000000..b04deeb --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.util.UUID; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; +import static org.apache.ignite.internal.processors.hadoop.state.HadoopMapReduceEmbeddedSelfTestState.flags; + +/** + * Tests map-reduce execution with embedded mode. + */ +public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * Tests whole job execution with all phases in old and new versions of API with definition of custom + * Serialization, Partitioner and IO formats. + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, + "key6", 18000 ); + + for (int i = 0; i < 2; i++) { + boolean useNewAPI = i == 1; + + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + flags.put("serializationWasConfigured", false); + flags.put("partitionerWasConfigured", false); + flags.put("inputFormatWasConfigured", false); + flags.put("outputFormatWasConfigured", false); + + JobConf jobConf = new JobConf(); + + jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + //To split into about 6-7 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); + + if (!useNewAPI) { + jobConf.setPartitionerClass(CustomV1Partitioner.class); + jobConf.setInputFormat(CustomV1InputFormat.class); + jobConf.setOutputFormat(CustomV1OutputFormat.class); + } + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false); + + if (useNewAPI) { + job.setPartitionerClass(CustomV2Partitioner.class); + job.setInputFormatClass(CustomV2InputFormat.class); + job.setOutputFormatClass(CustomV2OutputFormat.class); + } + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setNumReduceTasks(3); + + job.setJarByClass(HadoopWordCount2.class); + + IgniteInternalFuture fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertTrue("Serialization was configured (new API is " + useNewAPI + ")", + flags.get("serializationWasConfigured")); + + assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", + flags.get("partitionerWasConfigured")); + + assertTrue("Input format was configured (new API is = " + useNewAPI + ")", + flags.get("inputFormatWasConfigured")); + + assertTrue("Output format was configured (new API is = " + useNewAPI + ")", + flags.get("outputFormatWasConfigured")); + + assertEquals("Use new API = " + useNewAPI, + "key3\t15000\n" + + "key6\t18000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") + ); + + assertEquals("Use new API = " + useNewAPI, + "key1\t10000\n" + + "key4\t7000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") + ); + + assertEquals("Use new API = " + useNewAPI, + "key2\t20000\n" + + "key5\t12000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") + ); + + } + } + + /** + * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. + */ + protected static class CustomSerialization extends WritableSerialization { + @Override public void setConf(Configuration conf) { + super.setConf(conf); + + flags.put("serializationWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v1 API. + */ + private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("partitionerWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v2 API. + */ + private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner + implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("partitionerWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v2 API. + */ + private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("inputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of OutputFormat in v2 API. + */ + private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("outputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v1 API. + */ + private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + flags.put("inputFormatWasConfigured", true); + } + } + + /** + * Custom implementation of OutputFormat in v1 API. + */ + private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("outputFormatWasConfigured", true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java new file mode 100644 index 0000000..afd6f26 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +/** + * Test of error resiliency after an error in a map-reduce job execution. + * Combinations tested: + * { new ALI, old API } + * x { unchecked exception, checked exception, error } + * x { phase where the error happens }. + */ +public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest { + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Runtime() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_IOException() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Error() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Runtime() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_IOException() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Error() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** + * Tests correct work after an error. + * + * @throws Exception On error. + */ + private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception { + try { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow); + + boolean useNewMapper = (useNewBits & 1) == 0; + boolean useNewCombiner = (useNewBits & 2) == 0; + boolean useNewReducer = (useNewBits & 4) == 0; + + for (int i = 0; i < 12; i++) { + int bits = 1 << i; + + System.out.println("############################ Simulator kind = " + simulatorKind + + ", Stage bits = " + bits); + + HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits); + + doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer); + } + } catch (Throwable t) { + t.printStackTrace(); + + fail("Unexpected throwable: " + t); + } + } + + /** + * Performs test with given error simulator. + * + * @param sim The simulator. + * @param inFile Input file. + * @param useNewMapper If the use new mapper API. + * @param useNewCombiner If to use new combiner. + * @param useNewReducer If to use new reducer API. + * @throws Exception If failed. + */ + private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper, + boolean useNewCombiner, boolean useNewReducer) throws Exception { + // Set real simulating error simulator: + assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim)); + + try { + // Expect failure there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } + catch (Throwable t) { // This may be an Error. + // Expected: + System.out.println(t.toString()); // Ignore, continue the test. + } + + // Set no-op error simulator: + assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance)); + + // Expect success there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java new file mode 100644 index 0000000..feccb59 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2; + +/** + * Test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { + /** + * Tests whole job execution with all phases in all combination of new and old versions of API. + * @throws Exception If fails. + */ + public void testWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); + + for (boolean[] apiMode: getApiModes()) { + assert apiMode.length == 3; + + boolean useNewMapper = apiMode[0]; + boolean useNewCombiner = apiMode[1]; + boolean useNewReducer = apiMode[2]; + + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } + } + + /** + * Gets API mode combinations to be tested. + * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. + * + * @return Arrays of booleans indicating API combinations to test. + */ + protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, false }, + { false, false, true }, + { false, true, false }, + { true, false, false }, + { true, true, true }, + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java new file mode 100644 index 0000000..3bb8735 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Test attempt to execute a map-reduce task while no Hadoop processor available. + */ +public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setHadoopConfiguration(null); + c.setPeerClassLoadingEnabled(true); + + return c; + } + + /** {@inheritDoc} */ + @Override public void testWholeMapReduceExecution() throws Exception { + try { + super.testWholeMapReduceExecution(); + + fail("IllegalStateException expected."); + } + catch (IllegalStateException ignore) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java new file mode 100644 index 0000000..220614c --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.UUID; + +/** + * Mock job for planner tests. + */ +public class HadoopPlannerMockJob implements HadoopJob { + /** Input splits. */ + private final Collection splits; + + /** Reducers count. */ + private final int reducers; + + /** + * Constructor. + * + * @param splits Input splits. + * @param reducers Reducers. + */ + public HadoopPlannerMockJob(Collection splits, int reducers) { + this.splits = splits; + this.reducers = reducers; + } + + /** {@inheritDoc} */ + @Override public Collection input() throws IgniteCheckedException { + return splits; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return new JobInfo(reducers); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void dispose(boolean external) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + throwUnsupported(); + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } + + /** + * Mocked job info. + */ + private static class JobInfo implements HadoopJobInfo { + /** Reducers. */ + private final int reducers; + + /** + * Constructor. + * + * @param reducers Reducers. + */ + public JobInfo(int reducers) { + this.reducers = reducers; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + return reducers; + } + + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public HadoopJob createJob(Class jobCls, HadoopJobId jobId, IgniteLogger log, + @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String jobName() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String user() { + throwUnsupported(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java new file mode 100644 index 0000000..5a55430 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import com.google.common.collect.MinMaxPriorityQueue; +import java.io.IOException; +import java.util.Comparator; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static com.google.common.collect.Maps.immutableEntry; +import static com.google.common.collect.MinMaxPriorityQueue.orderedBy; +import static java.util.Collections.reverseOrder; + +/** + * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than + * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are + * output. + * + * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system + * path. + */ +public class HadoopPopularWordsTest { + /** Ignite home. */ + private static final String IGNITE_HOME = U.getIgniteHome(); + + /** The path to the input directory. ALl files in that directory will be processed. */ + private static final Path BOOKS_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books"); + + /** The path to the output directory. THe result file will be written to this location. */ + private static final Path RESULT_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output"); + + /** Popular books source dir in DFS. */ + private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); + + /** Popular books source dir in DFS. */ + private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); + + /** Path to the distributed file system configuration. */ + private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Top N words to select **/ + private static final int POPULAR_WORDS_CNT = 10; + + /** + * For each token in the input string the mapper emits a {word, 1} pair. + */ + private static class TokenizingMapper extends Mapper { + /** Constant value. */ + private static final IntWritable ONE = new IntWritable(1); + + /** The word converted into the Text. */ + private Text word = new Text(); + + /** + * Emits a entry where the key is the word and the value is always 1. + * + * @param key the current position in the input file (not used here) + * @param val the text string + * @param ctx mapper context + * @throws IOException + * @throws InterruptedException + */ + @Override protected void map(LongWritable key, Text val, Context ctx) + throws IOException, InterruptedException { + // Get the mapped object. + final String line = val.toString(); + + // Splits the given string to words. + final String[] words = line.split("[^a-zA-Z0-9]"); + + for (final String w : words) { + // Only emit counts for longer words. + if (w.length() <= 3) + continue; + + word.set(w); + + // Write the word into the context with the initial count equals 1. + ctx.write(word, ONE); + } + } + } + + /** + * The reducer uses a priority queue to rank the words based on its number of occurrences. + */ + private static class TopNWordsReducer extends Reducer { + private MinMaxPriorityQueue> q; + + TopNWordsReducer() { + q = orderedBy(reverseOrder(new Comparator>() { + @Override public int compare(Entry o1, Entry o2) { + return o1.getKey().compareTo(o2.getKey()); + } + })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); + } + + /** + * This method doesn't emit anything, but just keeps track of the top N words. + * + * @param key The word. + * @param vals The words counts. + * @param ctx Reducer context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override public void reduce(Text key, Iterable vals, Context ctx) throws IOException, + InterruptedException { + int sum = 0; + + for (IntWritable val : vals) + sum += val.get(); + + q.add(immutableEntry(sum, key.toString())); + } + + /** + * This method is called after all the word entries have been processed. It writes the accumulated + * statistics to the job output file. + * + * @param ctx The job context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + IntWritable i = new IntWritable(); + + Text txt = new Text(); + + // iterate in desc order + while (!q.isEmpty()) { + Entry e = q.removeFirst(); + + i.set(e.getKey()); + + txt.set(e.getValue()); + + ctx.write(txt, i); + } + } + } + + /** + * Configures the Hadoop MapReduce job. + * + * @return Instance of the Hadoop MapRed job. + * @throws IOException If failed. + */ + @SuppressWarnings("deprecation") + private Job createConfigBasedHadoopJob() throws IOException { + Job jobCfg = new Job(); + + Configuration cfg = jobCfg.getConfiguration(); + + // Use explicit configuration of distributed file system, if provided. + cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); + + jobCfg.setJobName("HadoopPopularWordExample"); + jobCfg.setJarByClass(HadoopPopularWordsTest.class); + jobCfg.setInputFormatClass(TextInputFormat.class); + jobCfg.setOutputKeyClass(Text.class); + jobCfg.setOutputValueClass(IntWritable.class); + jobCfg.setMapperClass(TokenizingMapper.class); + jobCfg.setReducerClass(TopNWordsReducer.class); + + FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); + FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); + + // Local job tracker allows the only task per wave, but text input format + // replaces it with the calculated value based on input split size option. + if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { + // Split job into tasks using 32MB split size. + FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); + FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); + } + + return jobCfg; + } + + /** + * Runs the Hadoop job. + * + * @return {@code True} if succeeded, {@code false} otherwise. + * @throws Exception If failed. + */ + private boolean runWordCountConfigBasedHadoopJob() throws Exception { + Job job = createConfigBasedHadoopJob(); + + // Distributed file system this job will work with. + FileSystem fs = FileSystem.get(job.getConfiguration()); + + X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); + + // Prepare input and output job directories. + prepareDirectories(fs); + + long time = System.currentTimeMillis(); + + // Run job. + boolean res = job.waitForCompletion(true); + + X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); + + // Move job results into local file system, so you can view calculated results. + publishResults(fs); + + return res; + } + + /** + * Prepare job's data: cleanup result directories that might have left over + * after previous runs, copy input files from the local file system into DFS. + * + * @param fs Distributed file system to use in job. + * @throws IOException If failed. + */ + private void prepareDirectories(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); + + fs.delete(RESULT_DFS_DIR, true); + + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); + + fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); + } + + /** + * Publish job execution results into local file system, so you can view them. + * + * @param fs Distributed file sytem used in job. + * @throws IOException If failed. + */ + private void publishResults(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.delete(RESULT_LOCAL_DIR, true); + + X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); + } + + /** + * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of + * occurrences of the word in the source files, the N most popular words are selected. + * + * @param args None. + */ + public static void main(String[] args) { + try { + new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); + } + catch (Exception e) { + X.println(">>> Failed to run word count example: " + e.getMessage()); + } + + System.exit(0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java new file mode 100644 index 0000000..5ccc8ce --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.util.Arrays; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopSerializationWrapper; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test of wrapper of the native serialization. + */ +public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { + /** + * Tests read/write of IntWritable via native WritableSerialization. + * @throws Exception If fails. + */ + public void testIntWritableSerialization() throws Exception { + HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, new IntWritable(3)); + ser.write(out, new IntWritable(-5)); + + assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray())); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((IntWritable)ser.read(in, null)).get()); + assertEquals(-5, ((IntWritable)ser.read(in, null)).get()); + } + + /** + * Tests read/write of Integer via native JavaleSerialization. + * @throws Exception If fails. + */ + public void testIntJavaSerialization() throws Exception { + HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, 3); + ser.write(out, -5); + ser.close(); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((Integer)ser.read(in, null)).intValue()); + assertEquals(-5, ((Integer)ser.read(in, null)).intValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java new file mode 100644 index 0000000..e27c212 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +/** + * Same test as HadoopMapReduceTest, but with enabled Snappy output compression. + */ +public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected boolean compressOutputSnappy() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, true }, + { true, true, true }, + }; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java new file mode 100644 index 0000000..80ff754 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests isolated Hadoop Snappy codec usage. + */ +public class HadoopSnappyTest extends GridCommonAbstractTest { + /** Length of data. */ + private static final int BYTE_SIZE = 1024 * 50; + + /** + * Checks Snappy codec usage. + * + * @throws Exception On error. + */ + public void testSnappy() throws Throwable { + // Run Snappy test in default class loader: + checkSnappy(); + + // Run the same in several more class loaders simulating jobs and tasks: + for (int i = 0; i < 2; i++) { + ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl()); + + Class cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr); + + assertEquals(hadoopClsLdr, cls.getClassLoader()); + + U.invoke(cls, null, "checkSnappy"); + } + } + + /** + * Internal check routine. + * + * @throws Throwable If failed. + */ + public static void checkSnappy() throws Throwable { + try { + byte[] expBytes = new byte[BYTE_SIZE]; + byte[] actualBytes = new byte[BYTE_SIZE]; + + for (int i = 0; i < expBytes.length ; i++) + expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16); + + SnappyCodec codec = new SnappyCodec(); + + codec.setConf(new Configuration()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (CompressionOutputStream cos = codec.createOutputStream(baos)) { + cos.write(expBytes); + cos.flush(); + } + + try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + int read = cis.read(actualBytes, 0, actualBytes.length); + + assert read == actualBytes.length; + } + + assert Arrays.equals(expBytes, actualBytes); + } + catch (Throwable e) { + System.out.println("Snappy check failed:"); + System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded()); + System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded()); + + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java new file mode 100644 index 0000000..eb4a7d4 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; + +/** + * External test for sorting. + */ +public class HadoopSortingExternalTest extends HadoopSortingTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new JdkMarshaller()); + + return cfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java new file mode 100644 index 0000000..a4e7368 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Scanner; +import java.util.UUID; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.util.typedef.X; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; + +/** + * Tests correct sorting. + */ +public class HadoopSortingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_INPUT = "/test-in"; + + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSortSimple() throws Exception { + // Generate test data. + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + job.setMapperClass(Mapper.class); + job.setNumReduceTasks(0); + + setupFileSystems(job.getConfiguration()); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT)); + + X.printerrln("Data generation started."); + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Data generation complete."); + + // Run main map-reduce job. + job = Job.getInstance(); + + setupFileSystems(job.getConfiguration()); + + job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + + "," + WritableSerialization.class.getName()); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setSortComparatorClass(JavaSerializationComparator.class); + + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + + job.setNumReduceTasks(2); + + job.setMapOutputKeyClass(UUID.class); + job.setMapOutputValueClass(NullWritable.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + X.printerrln("Job started."); + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Job complete."); + + // Check result. + Path outDir = new Path(igfsScheme() + PATH_OUTPUT); + + AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration()); + + for (FileStatus file : fs.listStatus(outDir)) { + X.printerrln("__ file: " + file); + + if (file.getLen() == 0) + continue; + + FSDataInputStream in = fs.open(file.getPath()); + + Scanner sc = new Scanner(in); + + UUID prev = null; + + while(sc.hasNextLine()) { + UUID next = UUID.fromString(sc.nextLine()); + +// X.printerrln("___ check: " + next); + + if (prev != null) + assertTrue(prev.compareTo(next) < 0); + + prev = next; + } + } + } + + public static class InFormat extends InputFormat { + /** {@inheritDoc} */ + @Override public List getSplits(JobContext ctx) throws IOException, InterruptedException { + List res = new ArrayList<>(); + + FakeSplit split = new FakeSplit(20); + + for (int i = 0; i < 10; i++) + res.add(split); + + return res; + } + + /** {@inheritDoc} */ + @Override public RecordReader createRecordReader(final InputSplit split, + TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader() { + /** */ + int cnt; + + /** */ + Text txt = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return ++cnt <= split.getLength(); + } + + @Override public Text getCurrentKey() { + txt.set(UUID.randomUUID().toString()); + +// X.printerrln("___ read: " + txt); + + return txt; + } + + @Override public NullWritable getCurrentValue() { + return NullWritable.get(); + } + + @Override public float getProgress() throws IOException, InterruptedException { + return (float)cnt / split.getLength(); + } + + @Override public void close() { + // No-op. + } + }; + } + } + + public static class MyMapper extends Mapper { + /** {@inheritDoc} */ + @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { +// X.printerrln("___ map: " + val); + + ctx.write(UUID.fromString(val.toString()), NullWritable.get()); + } + } + + public static class MyReducer extends Reducer { + /** */ + private Text text = new Text(); + + /** {@inheritDoc} */ + @Override protected void reduce(UUID key, Iterable vals, Context ctx) + throws IOException, InterruptedException { +// X.printerrln("___ rdc: " + key); + + text.set(key.toString()); + + ctx.write(text, NullWritable.get()); + } + } + + public static class FakeSplit extends InputSplit implements Writable { + /** */ + private static final String[] HOSTS = {"127.0.0.1"}; + + /** */ + private int len; + + /** + * @param len Length. + */ + public FakeSplit(int len) { + this.len = len; + } + + /** + * + */ + public FakeSplit() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long getLength() throws IOException, InterruptedException { + return len; + } + + /** {@inheritDoc} */ + @Override public String[] getLocations() throws IOException, InterruptedException { + return HOSTS; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(len); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + len = in.readInt(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java new file mode 100644 index 0000000..be2bfc2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.concurrent.Callable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Self test of {@link HadoopSplitWrapper}. + */ +public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest { + /** + * Tests serialization of wrapper and the wrapped native split. + * @throws Exception If fails. + */ + public void testSerialization() throws Exception { + FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"}); + + assertEquals("/path/to/file:100+500", nativeSplit.toString()); + + HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations()); + + assertEquals("[host1, host2]", Arrays.toString(split.hosts())); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + ObjectOutput out = new ObjectOutputStream(buf); + + out.writeObject(split); + + ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray())); + + final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject(); + + assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString()); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + res.hosts(); + + return null; + } + }, AssertionError.class, null); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java new file mode 100644 index 0000000..66e341b --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.G; + +/** + * Hadoop node startup. + */ +public class HadoopStartup { + /** + * @param args Arguments. + */ + public static void main(String[] args) { + G.start("config/hadoop/default-config.xml"); + } + + /** + * @return Configuration for job run. + */ + @SuppressWarnings("UnnecessaryFullyQualifiedName") + public static Configuration configuration() { + Configuration cfg = new Configuration(); + + cfg.set("fs.defaultFS", "igfs://igfs@localhost"); + + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); + + cfg.set("mapreduce.framework.name", "ignite"); + cfg.set("mapreduce.jobtracker.address", "localhost:11211"); + + return cfg; + } +} \ No newline at end of file