Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E948B200BB4 for ; Mon, 26 Sep 2016 14:13:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E7F9D160AE3; Mon, 26 Sep 2016 12:13:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A9DD0160AEC for ; Mon, 26 Sep 2016 14:13:16 +0200 (CEST) Received: (qmail 59059 invoked by uid 500); 26 Sep 2016 12:13:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 58918 invoked by uid 99); 26 Sep 2016 12:13:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2016 12:13:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 524DFE2F35; Mon, 26 Sep 2016 12:13:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Date: Mon, 26 Sep 2016 12:13:24 -0000 Message-Id: <23dee96f34264db297294a02375b5dca@git.apache.org> In-Reply-To: <12075e2a79df4fd8bcb41f7ed71166f8@git.apache.org> References: <12075e2a79df4fd8bcb41f7ed71166f8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode. archived-at: Mon, 26 Sep 2016 12:13:20 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java new file mode 100644 index 0000000..a65d691 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.client; + +import org.apache.ignite.configuration.HadoopConfiguration; + +/** + * Hadoop client protocol tests in embedded process mode. + */ +public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(false); + + return cfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java new file mode 100644 index 0000000..1ef7dd0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.client; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.StringTokenizer; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Hadoop client protocol tests in external process mode. + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { + /** Input path. */ + private static final String PATH_INPUT = "/input"; + + /** Output path. */ + private static final String PATH_OUTPUT = "/output"; + + /** Job name. */ + private static final String JOB_NAME = "myJob"; + + /** Setup lock file. */ + private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-setup.file"); + + /** Map lock file. */ + private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-map.file"); + + /** Reduce lock file. */ + private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-reduce.file"); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean restEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + + setupLockFile.delete(); + mapLockFile.delete(); + reduceLockFile.delete(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); +// IgniteHadoopClientProtocolProvider.cliMap.clear(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + setupLockFile.createNewFile(); + mapLockFile.createNewFile(); + reduceLockFile.createNewFile(); + + setupLockFile.deleteOnExit(); + mapLockFile.deleteOnExit(); + reduceLockFile.deleteOnExit(); + + super.beforeTest(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format(); + + setupLockFile.delete(); + mapLockFile.delete(); + reduceLockFile.delete(); + + super.afterTest(); + } + + /** + * Test next job ID generation. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private void tstNextJobId() throws Exception { + IgniteHadoopClientProtocolProvider provider = provider(); + + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); + + JobID jobId = proto.getNewJobID(); + + assert jobId != null; + assert jobId.getJtIdentifier() != null; + + JobID nextJobId = proto.getNewJobID(); + + assert nextJobId != null; + assert nextJobId.getJtIdentifier() != null; + + assert !F.eq(jobId, nextJobId); + } + + /** + * Tests job counters retrieval. + * + * @throws Exception If failed. + */ + public void testJobCounters() throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); + + igfs.mkdirs(new IgfsPath(PATH_INPUT)); + + try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( + new IgfsPath(PATH_INPUT + "/test.file"), true)))) { + + bw.write( + "alpha\n" + + "beta\n" + + "gamma\n" + + "alpha\n" + + "beta\n" + + "gamma\n" + + "alpha\n" + + "beta\n" + + "gamma\n" + ); + } + + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); + + final Job job = Job.getInstance(conf); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestCountingMapper.class); + job.setReducerClass(TestCountingReducer.class); + job.setCombinerClass(TestCountingCombiner.class); + + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + + job.submit(); + + final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); + + assertEquals(0, cntr.getValue()); + + cntr.increment(10); + + assertEquals(10, cntr.getValue()); + + // Transferring to map phase. + setupLockFile.delete(); + + // Transferring to reduce phase. + mapLockFile.delete(); + + job.waitForCompletion(false); + + assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); + + final Counters counters = job.getCounters(); + + assertNotNull("counters cannot be null", counters); + assertEquals("wrong counters count", 3, counters.countCounters()); + assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); + } + + /** + * Tests job counters retrieval for unknown job id. + * + * @throws Exception If failed. + */ + private void tstUnknownJobCounters() throws Exception { + IgniteHadoopClientProtocolProvider provider = provider(); + + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); + + try { + proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1)); + fail("exception must be thrown"); + } + catch (Exception e) { + assert e instanceof IOException : "wrong error has been thrown"; + } + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMap() throws Exception { + checkJobSubmit(true, true); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapCombine() throws Exception { + checkJobSubmit(false, true); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapReduce() throws Exception { + checkJobSubmit(true, false); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapCombineReduce() throws Exception { + checkJobSubmit(false, false); + } + + /** + * Test job submission. + * + * @param noCombiners Whether there are no combiners. + * @param noReducers Whether there are no reducers. + * @throws Exception If failed. + */ + public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); + + igfs.mkdirs(new IgfsPath(PATH_INPUT)); + + try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( + new IgfsPath(PATH_INPUT + "/test.file"), true)))) { + + bw.write("word"); + } + + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); + + final Job job = Job.getInstance(conf); + + job.setJobName(JOB_NAME); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + + if (!noCombiners) + job.setCombinerClass(TestCombiner.class); + + if (noReducers) + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TestOutputFormat.class); + + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + + job.submit(); + + JobID jobId = job.getJobID(); + + // Setup phase. + JobStatus jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; + assert jobStatus.getMapProgress() == 0.0f; + assert jobStatus.getReduceProgress() == 0.0f; + + U.sleep(2100); + + JobStatus recentJobStatus = job.getStatus(); + + assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : + "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); + + // Transferring to map phase. + setupLockFile.delete(); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getSetupProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } + } + }, 5000L); + + // Map phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; + assert jobStatus.getReduceProgress() == 0.0f; + + U.sleep(2100); + + recentJobStatus = job.getStatus(); + + assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : + "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress(); + + // Transferring to reduce phase. + mapLockFile.delete(); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getMapProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } + } + }, 5000L); + + if (!noReducers) { + // Reduce phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; + + // Ensure that reduces progress increases. + U.sleep(2100); + + recentJobStatus = job.getStatus(); + + assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : + "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); + + reduceLockFile.delete(); + } + + job.waitForCompletion(false); + + jobStatus = job.getStatus(); + checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() == 1.0f; + + dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); + } + + /** + * Dump IGFS content. + * + * @param igfs IGFS. + * @param path Path. + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception { + IgfsFile file = igfs.info(path); + + assert file != null; + + System.out.println(file.path()); + + if (file.isDirectory()) { + for (IgfsPath child : igfs.listPaths(path)) + dumpIgfs(igfs, child); + } + else { + try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) { + String line = br.readLine(); + + while (line != null) { + System.out.println(line); + + line = br.readLine(); + } + } + } + } + + /** + * Check job status. + * + * @param status Job status. + * @param expJobId Expected job ID. + * @param expJobName Expected job name. + * @param expState Expected state. + * @param expCleanupProgress Expected cleanup progress. + * @throws Exception If failed. + */ + private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName, + JobStatus.State expState, float expCleanupProgress) throws Exception { + assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID(); + assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName(); + assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState(); + assert F.eq(status.getCleanupProgress(), expCleanupProgress) : + "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress(); + } + + /** + * @return Configuration. + */ + private Configuration config(int port) { + Configuration conf = HadoopUtils.safeCreateConfiguration(); + + setupFileSystems(conf); + + conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME); + conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port); + + conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/"); + + return conf; + } + + /** + * @return Protocol provider. + */ + private IgniteHadoopClientProtocolProvider provider() { + return new IgniteHadoopClientProtocolProvider(); + } + + /** + * Test mapper. + */ + public static class TestMapper extends Mapper { + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + while (mapLockFile.exists()) + Thread.sleep(50); + + StringTokenizer wordList = new StringTokenizer(val.toString()); + + while (wordList.hasMoreTokens()) { + word.set(wordList.nextToken()); + + ctx.write(word, one); + } + } + } + + /** + * Test Hadoop counters. + */ + public enum TestCounter { + COUNTER1, COUNTER2, COUNTER3 + } + + /** + * Test mapper that uses counters. + */ + public static class TestCountingMapper extends TestMapper { + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + super.map(key, val, ctx); + ctx.getCounter(TestCounter.COUNTER1).increment(1); + } + } + + /** + * Test combiner that counts invocations. + */ + public static class TestCountingCombiner extends TestReducer { + @Override public void reduce(Text key, Iterable values, + Context ctx) throws IOException, InterruptedException { + ctx.getCounter(TestCounter.COUNTER1).increment(1); + ctx.getCounter(TestCounter.COUNTER2).increment(1); + + int sum = 0; + for (IntWritable value : values) + sum += value.get(); + + ctx.write(key, new IntWritable(sum)); + } + } + + /** + * Test reducer that counts invocations. + */ + public static class TestCountingReducer extends TestReducer { + @Override public void reduce(Text key, Iterable values, + Context ctx) throws IOException, InterruptedException { + ctx.getCounter(TestCounter.COUNTER1).increment(1); + ctx.getCounter(TestCounter.COUNTER3).increment(1); + } + } + + /** + * Test combiner. + */ + public static class TestCombiner extends Reducer { + // No-op. + } + + public static class TestOutputFormat extends TextOutputFormat { + /** {@inheritDoc} */ + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx) + throws IOException { + return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx)); + } + } + + /** + * Test output committer. + */ + private static class TestOutputCommitter extends FileOutputCommitter { + /** Delegate. */ + private final FileOutputCommitter delegate; + + /** + * Constructor. + * + * @param ctx Task attempt context. + * @param delegate Delegate. + * @throws IOException If failed. + */ + private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException { + super(FileOutputFormat.getOutputPath(ctx), ctx); + + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void setupJob(JobContext jobCtx) throws IOException { + try { + while (setupLockFile.exists()) + Thread.sleep(50); + } + catch (InterruptedException ignored) { + throw new IOException("Interrupted."); + } + + delegate.setupJob(jobCtx); + } + + /** {@inheritDoc} */ + @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException { + delegate.setupTask(taskCtx); + } + + /** {@inheritDoc} */ + @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException { + return delegate.needsTaskCommit(taskCtx); + } + + /** {@inheritDoc} */ + @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException { + delegate.commitTask(taskCtx); + } + + /** {@inheritDoc} */ + @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException { + delegate.abortTask(taskCtx); + } + } + + /** + * Test reducer. + */ + public static class TestReducer extends Reducer { + /** Writable container for writing sum of word counts. */ + private IntWritable totalWordCnt = new IntWritable(); + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable values, Context ctx) throws IOException, + InterruptedException { + while (reduceLockFile.exists()) + Thread.sleep(50); + + int wordCnt = 0; + + for (IntWritable value : values) + wordCnt += value.get(); + + totalWordCnt.set(wordCnt); + + ctx.write(key, totalWordCnt); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java new file mode 100644 index 0000000..0df9c6a --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount1 { + /** + * Entry point to start job. + * @param args command line parameters. + * @throws Exception if fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + JobConf job = getJob(args[0], args[1]); + + JobClient.runJob(job); + } + + /** + * Gets fully configured JobConf instance. + * + * @param input input file name. + * @param output output directory name. + * @return Job configuration + */ + public static JobConf getJob(String input, String output) { + JobConf conf = new JobConf(HadoopWordCount1.class); + conf.setJobName("wordcount"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + setTasksClasses(conf, true, true, true); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + return conf; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param jobConf Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) { + if (setMapper) { + jobConf.setMapperClass(HadoopWordCount1Map.class); + jobConf.setInputFormat(TextInputFormat.class); + } + + if (setCombiner) + jobConf.setCombinerClass(HadoopWordCount1Reduce.class); + + if (setReducer) { + jobConf.setReducerClass(HadoopWordCount1Reduce.class); + jobConf.setOutputFormat(TextOutputFormat.class); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java new file mode 100644 index 0000000..6a98a24 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import java.util.StringTokenizer; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount1Map extends MapReduceBase implements Mapper { + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void map(LongWritable key, Text val, OutputCollector output, Reporter reporter) + throws IOException { + + assert wasConfigured : "Mapper should be configured"; + + String line = val.toString(); + + StringTokenizer tokenizer = new StringTokenizer(line); + + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + + output.collect(word, one); + } + + HadoopErrorSimulator.instance().onMap(); + } + + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + + HadoopErrorSimulator.instance().onMapConfigure(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + super.close(); + + HadoopErrorSimulator.instance().onMapClose(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java new file mode 100644 index 0000000..ab91e0c --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer { + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) + throws IOException { + assert wasConfigured : "Reducer should be configured"; + + int sum = 0; + + while (values.hasNext()) + sum += values.next().get(); + + output.collect(key, new IntWritable(sum)); + + HadoopErrorSimulator.instance().onReduce(); + } + + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + + HadoopErrorSimulator.instance().onReduceConfigure(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java new file mode 100644 index 0000000..3ddc923 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount2 { + /** + * Entry point to start job. + * + * @param args Command line parameters. + * @throws Exception If fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + Job job = getJob(args[0], args[1]); + + job.submit(); + } + + /** + * Gets fully configured Job instance. + * + * @param input Input file name. + * @param output Output directory name. + * @return Job instance. + * @throws IOException If fails. + */ + public static Job getJob(String input, String output) throws IOException { + Job job = Job.getInstance(); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + setTasksClasses(job, true, true, true, false); + + FileInputFormat.setInputPaths(job, new Path(input)); + FileOutputFormat.setOutputPath(job, new Path(output)); + + job.setJarByClass(HadoopWordCount2.class); + + return job; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param job Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer, + boolean outputCompression) { + if (setMapper) { + job.setMapperClass(HadoopWordCount2Mapper.class); + job.setInputFormatClass(TextInputFormat.class); + } + + if (setCombiner) + job.setCombinerClass(HadoopWordCount2Combiner.class); + + if (setReducer) { + job.setReducerClass(HadoopWordCount2Reducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + } + + if (outputCompression) { + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); + + SequenceFileOutputFormat.setCompressOutput(job, true); + + job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java new file mode 100644 index 0000000..a643a92 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator; + +/** + * Combiner function with pluggable error simulator. + */ +public class HadoopWordCount2Combiner extends HadoopWordCount2Reducer { + /** {@inheritDoc} */ + @Override protected void configError() { + HadoopErrorSimulator.instance().onCombineConfigure(); + } + + /** {@inheritDoc} */ + @Override protected void setupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombineSetup(); + } + + /** {@inheritDoc} */ + @Override protected void reduceError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombine(); + } + + /** {@inheritDoc} */ + @Override protected void cleanupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombineCleanup(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java new file mode 100644 index 0000000..336db84 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import java.util.StringTokenizer; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount2Mapper extends Mapper implements Configurable { + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Mapper should be configured"; + assert wasSetUp : "Mapper should be set up"; + + StringTokenizer wordList = new StringTokenizer(val.toString()); + + while (wordList.hasMoreTokens()) { + word.set(wordList.nextToken()); + + ctx.write(word, one); + } + + HadoopErrorSimulator.instance().onMap(); + } + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + + wasSetUp = true; + + HadoopErrorSimulator.instance().onMapSetup(); + } + + /** {@inheritDoc} */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + super.cleanup(ctx); + + HadoopErrorSimulator.instance().onMapCleanup(); + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + + HadoopErrorSimulator.instance().onMapConfigure(); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java new file mode 100644 index 0000000..f24288e --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.examples; + +import java.io.IOException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount2Reducer extends Reducer implements Configurable { + /** Writable container for writing sum of word counts. */ + private IntWritable totalWordCnt = new IntWritable(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable values, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Reducer should be configured"; + assert wasSetUp : "Reducer should be set up"; + + int wordCnt = 0; + + for (IntWritable value : values) + wordCnt += value.get(); + + totalWordCnt.set(wordCnt); + + ctx.write(key, totalWordCnt); + + reduceError(); + } + + /** + * Simulates reduce error if needed. + */ + protected void reduceError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduce(); + } + + /** {@inheritDoc} */ + @Override protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + wasSetUp = true; + + setupError(); + } + + /** + * Simulates setup error if needed. + */ + protected void setupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduceSetup(); + } + + /** {@inheritDoc} */ + @Override protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + + cleanupError(); + } + + /** + * Simulates cleanup error if needed. + */ + protected void cleanupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduceCleanup(); + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + + configError(); + } + + /** + * Simulates configuration error if needed. + */ + protected void configError() { + HadoopErrorSimulator.instance().onReduceConfigure(); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java new file mode 100644 index 0000000..8c95a0e --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.fs; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.concurrent.Callable; + +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * Tests KerberosHadoopFileSystemFactory. + */ +public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest { + /** + * Test parameters validation. + * + * @throws Exception If failed. + */ + public void testParameters() throws Exception { + checkParameters(null, null, -1); + + checkParameters(null, null, 100); + checkParameters(null, "b", -1); + checkParameters("a", null, -1); + + checkParameters(null, "b", 100); + checkParameters("a", null, 100); + checkParameters("a", "b", -1); + } + + /** + * Check parameters. + * + * @param keyTab Key tab. + * @param keyTabPrincipal Key tab principal. + * @param reloginInterval Re-login interval. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) { + final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory(); + + fac.setKeyTab(keyTab); + fac.setKeyTabPrincipal(keyTabPrincipal); + fac.setReloginInterval(reloginInterval); + + GridTestUtils.assertThrows(null, new Callable() { + @Override public Object call() throws Exception { + HadoopFileSystemFactoryDelegate delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(fac); + + delegate.start(); + + return null; + } + }, IllegalArgumentException.class, null); + } + + /** + * Checks serializatuion and deserialization of the secure factory. + * + * @throws Exception If failed. + */ + public void testSerialization() throws Exception { + KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory(); + + checkSerialization(fac); + + fac = new KerberosHadoopFileSystemFactory(); + + fac.setUri("igfs://igfs@localhost:10500/"); + fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml"); + fac.setKeyTabPrincipal("foo"); + fac.setKeyTab("/etc/krb5.keytab"); + fac.setReloginInterval(30 * 60 * 1000L); + + checkSerialization(fac); + } + + /** + * Serializes the factory, + * + * @param fac The facory to check. + * @throws Exception If failed. + */ + private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + ObjectOutput oo = new ObjectOutputStream(baos); + + oo.writeObject(fac); + + ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); + + KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject(); + + assertEquals(fac.getUri(), fac2.getUri()); + Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths()); + assertEquals(fac.getKeyTab(), fac2.getKeyTab()); + assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal()); + assertEquals(fac.getReloginInterval(), fac2.getReloginInterval()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java new file mode 100644 index 0000000..a585e54 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.util.ChainedUserNameMapper; +import org.apache.ignite.hadoop.util.KerberosUserNameMapper; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsIpcEndpointType; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; + +/** + * Abstract test for Hadoop 1.0 file system stack. + */ +public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest { + /** Secondary grid name */ + private static final String GRID_NAME = "grid_secondary"; + + /** Secondary file system name */ + private static final String IGFS_NAME = "igfs_secondary"; + + /** Secondary file system REST endpoint port */ + private static final int PORT = 11500; + + /** Secondary file system REST endpoint configuration map. */ + private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration() {{ + setType(IgfsIpcEndpointType.TCP); + setPort(PORT); + }}; + + /** Secondary file system authority. */ + private static final String SECONDARY_AUTHORITY = IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + PORT; + + /** Secondary Fs configuration full path. */ + protected String secondaryConfFullPath; + + /** Secondary Fs URI. */ + protected String secondaryUri; + + /** Constructor. */ + public Hadoop1DualAbstractTest(IgfsMode mode) { + super(mode); + } + + /** + * Creates secondary filesystems. + * @return IgfsSecondaryFileSystem + * @throws Exception On failure. + */ + @Override protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception { + startUnderlying(); + + prepareConfiguration(); + + KerberosUserNameMapper mapper1 = new KerberosUserNameMapper(); + + mapper1.setRealm("TEST.COM"); + + TestUserNameMapper mapper2 = new TestUserNameMapper(); + + ChainedUserNameMapper mapper = new ChainedUserNameMapper(); + + mapper.setMappers(mapper1, mapper2); + + CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory(); + + factory.setUri(secondaryUri); + factory.setConfigPaths(secondaryConfFullPath); + factory.setUserNameMapper(mapper); + + IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem(); + + second.setFileSystemFactory(factory); + + igfsSecondary = new HadoopIgfsSecondaryFileSystemTestAdapter(factory); + + return second; + } + + /** + * Starts underlying Ignite process. + * @throws IOException On failure. + */ + protected void startUnderlying() throws Exception { + startGridWithIgfs(GRID_NAME, IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG, secondaryIpFinder); + } + + /** + * Prepares Fs configuration. + * @throws IOException On failure. + */ + protected void prepareConfiguration() throws IOException { + Configuration secondaryConf = HadoopSecondaryFileSystemConfigurationTest.configuration(IGFS_SCHEME, SECONDARY_AUTHORITY, true, true); + + secondaryConf.setInt("fs.igfs.block.size", 1024); + + secondaryConfFullPath = HadoopSecondaryFileSystemConfigurationTest.writeConfiguration(secondaryConf, HadoopSecondaryFileSystemConfigurationTest.SECONDARY_CFG_PATH); + + secondaryUri = HadoopSecondaryFileSystemConfigurationTest.mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); + } + + /** + * Test user name mapper. + */ + private static class TestUserNameMapper implements UserNameMapper, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Started flag. */ + private boolean started; + + /** {@inheritDoc} */ + @Nullable @Override public String map(String name) { + assert started; + assert name != null && name.contains("@"); + + return name.substring(0, name.indexOf("@")); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + started = true; + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java new file mode 100644 index 0000000..97cc7e9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import org.apache.ignite.igfs.IgfsMode; + +/** + * DUAL_ASYNC mode test. + */ +public class Hadoop1OverIgfsDualAsyncTest extends Hadoop1DualAbstractTest { + /** + * Constructor. + */ + public Hadoop1OverIgfsDualAsyncTest() { + super(IgfsMode.DUAL_ASYNC); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java new file mode 100644 index 0000000..12036bc --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import org.apache.ignite.igfs.IgfsMode; + +/** + * DUAL_SYNC mode. + */ +public class Hadoop1OverIgfsDualSyncTest extends Hadoop1DualAbstractTest { + /** + * Constructor. + */ + public Hadoop1OverIgfsDualSyncTest() { + super(IgfsMode.DUAL_SYNC); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java new file mode 100644 index 0000000..7cf7f2d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsIpcEndpointType; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.jetbrains.annotations.Nullable; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * Tests for Hadoop file system factory. + */ +public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { + /** Amount of "start" invocations */ + private static final AtomicInteger START_CNT = new AtomicInteger(); + + /** Amount of "stop" invocations */ + private static final AtomicInteger STOP_CNT = new AtomicInteger(); + + /** Path to secondary file system configuration. */ + private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml"; + + /** IGFS path for DUAL mode. */ + private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir"); + + /** IGFS path for PROXY mode. */ + private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir"); + + /** IGFS path for DUAL mode. */ + private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir"); + + /** IGFS path for PROXY mode. */ + private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir"); + + /** Secondary IGFS. */ + private IgfsEx secondary; + + /** Primary IGFS. */ + private IgfsEx primary; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + START_CNT.set(0); + STOP_CNT.set(0); + + secondary = startSecondary(); + primary = startPrimary(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + secondary = null; + primary = null; + + stopAllGrids(); + } + + /** + * Test custom factory. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCustomFactory() throws Exception { + assert START_CNT.get() == 1; + assert STOP_CNT.get() == 0; + + // Use IGFS directly. + primary.mkdirs(IGFS_PATH_DUAL); + + assert primary.exists(IGFS_PATH_DUAL); + assert secondary.exists(IGFS_PATH_DUAL); + + // Create remote instance. + FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration()); + + // Ensure lifecycle callback was invoked. + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 0; + + // Check file system operations. + assert fs.exists(PATH_DUAL); + + assert fs.delete(PATH_DUAL, true); + assert !primary.exists(IGFS_PATH_DUAL); + assert !secondary.exists(IGFS_PATH_DUAL); + assert !fs.exists(PATH_DUAL); + + assert fs.mkdirs(PATH_DUAL); + assert primary.exists(IGFS_PATH_DUAL); + assert secondary.exists(IGFS_PATH_DUAL); + assert fs.exists(PATH_DUAL); + + assert fs.mkdirs(PATH_PROXY); + assert secondary.exists(IGFS_PATH_PROXY); + assert fs.exists(PATH_PROXY); + + // Close file system and ensure that associated factory was notified. + fs.close(); + + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 1; + + // Stop primary node and ensure that base factory was notified. + G.stop(primary.context().kernalContext().grid().name(), true); + + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 2; + } + + /** + * Start secondary IGFS. + * + * @return IGFS. + * @throws Exception If failed. + */ + private static IgfsEx startSecondary() throws Exception { + return start("secondary", 11500, IgfsMode.PRIMARY, null); + } + + /** + * Start primary IGFS. + * + * @return IGFS. + * @throws Exception If failed. + */ + private static IgfsEx startPrimary() throws Exception { + // Prepare configuration. + Configuration conf = baseConfiguration(); + + conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/"); + + writeConfigurationToFile(conf); + + // Get file system instance to be used. + CachingHadoopFileSystemFactory delegate = new CachingHadoopFileSystemFactory(); + + delegate.setUri("igfs://secondary:secondary@127.0.0.1:11500/"); + delegate.setConfigPaths(SECONDARY_CFG_PATH); + + // Configure factory. + TestFactory factory = new TestFactory(delegate); + + // Configure file system. + IgniteHadoopIgfsSecondaryFileSystem secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(); + + secondaryFs.setFileSystemFactory(factory); + + // Start. + return start("primary", 10500, IgfsMode.DUAL_ASYNC, secondaryFs); + } + + /** + * Start Ignite node with IGFS instance. + * + * @param name Node and IGFS name. + * @param endpointPort Endpoint port. + * @param dfltMode Default path mode. + * @param secondaryFs Secondary file system. + * @return Igfs instance. + */ + private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode, + @Nullable IgfsSecondaryFileSystem secondaryFs) { + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setHost("127.0.0.1"); + endpointCfg.setPort(endpointPort); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(name); + igfsCfg.setDefaultMode(dfltMode); + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setInitializeDefaultPathModes(true); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(name); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return (IgfsEx)G.start(cfg).fileSystem(name); + } + + /** + * Create base FileSystem configuration. + * + * @return Configuration. + */ + private static Configuration baseConfiguration() { + Configuration conf = new Configuration(); + + conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + return conf; + } + + /** + * Write configuration to file. + * + * @param conf Configuration. + * @throws Exception If failed. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static void writeConfigurationToFile(Configuration conf) throws Exception { + final String path = U.getIgniteHome() + SECONDARY_CFG_PATH; + + File file = new File(path); + + file.delete(); + + assertFalse(file.exists()); + + try (FileOutputStream fos = new FileOutputStream(file)) { + conf.writeXml(fos); + } + + assertTrue(file.exists()); + } + + /** + * Test factory. + */ + private static class TestFactory implements HadoopFileSystemFactory, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** File system factory. */ + private CachingHadoopFileSystemFactory factory; + + /** File system. */ + private transient HadoopFileSystemFactoryDelegate delegate; + + /** + * Constructor. + * + * @param factory File system factory. + */ + public TestFactory(CachingHadoopFileSystemFactory factory) { + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public Object get(String usrName) throws IOException { + return delegate.get(usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(factory); + + delegate.start(); + + START_CNT.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + STOP_CNT.incrementAndGet(); + } + } +}