ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [11/11] incubator-ignite git commit: # WIP on correct IGFS tests placement.
Date Tue, 10 Mar 2015 08:56:37 GMT
# WIP on correct IGFS tests placement.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bae553db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bae553db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bae553db

Branch: refs/heads/ignite-438
Commit: bae553db645dddf1563b94132afedb064d162e3e
Parents: 314bf52
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Mar 10 11:54:59 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Mar 10 11:54:59 2015 +0300

----------------------------------------------------------------------
 .../HadoopClientProtocolEmbeddedSelfTest.java   |   34 -
 .../hadoop/HadoopClientProtocolSelfTest.java    |  635 -----
 .../HadoopIgfs20FileSystemAbstractSelfTest.java | 1967 ---------------
 ...Igfs20FileSystemLoopbackPrimarySelfTest.java |   74 -
 ...oopIgfs20FileSystemShmemPrimarySelfTest.java |   74 -
 .../igfs/HadoopIgfsDualAbstractSelfTest.java    |  305 ---
 .../igfs/HadoopIgfsDualAsyncSelfTest.java       |   32 -
 .../ignite/igfs/HadoopIgfsDualSyncSelfTest.java |   32 -
 ...oopSecondaryFileSystemConfigurationTest.java |  559 -----
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |  267 --
 .../igfs/IgfsNearOnlyMultiNodeSelfTest.java     |  212 --
 .../IgniteHadoopFileSystemAbstractSelfTest.java | 2366 -----------------
 .../IgniteHadoopFileSystemClientSelfTest.java   |  199 --
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |  311 ---
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |  207 --
 .../IgniteHadoopFileSystemLoggerSelfTest.java   |  287 ---
 ...niteHadoopFileSystemLoggerStateSelfTest.java |  325 ---
 ...adoopFileSystemLoopbackAbstractSelfTest.java |   46 -
 ...SystemLoopbackEmbeddedDualAsyncSelfTest.java |   33 -
 ...eSystemLoopbackEmbeddedDualSyncSelfTest.java |   33 -
 ...leSystemLoopbackEmbeddedPrimarySelfTest.java |   33 -
 ...SystemLoopbackEmbeddedSecondarySelfTest.java |   34 -
 ...SystemLoopbackExternalDualAsyncSelfTest.java |   33 -
 ...eSystemLoopbackExternalDualSyncSelfTest.java |   33 -
 ...leSystemLoopbackExternalPrimarySelfTest.java |   33 -
 ...SystemLoopbackExternalSecondarySelfTest.java |   34 -
 ...teHadoopFileSystemSecondaryModeSelfTest.java |  319 ---
 ...teHadoopFileSystemShmemAbstractSelfTest.java |   88 -
 ...ileSystemShmemEmbeddedDualAsyncSelfTest.java |   33 -
 ...FileSystemShmemEmbeddedDualSyncSelfTest.java |   33 -
 ...pFileSystemShmemEmbeddedPrimarySelfTest.java |   33 -
 ...ileSystemShmemEmbeddedSecondarySelfTest.java |   33 -
 ...ileSystemShmemExternalDualAsyncSelfTest.java |   33 -
 ...FileSystemShmemExternalDualSyncSelfTest.java |   33 -
 ...pFileSystemShmemExternalPrimarySelfTest.java |   33 -
 ...ileSystemShmemExternalSecondarySelfTest.java |   33 -
 .../HadoopClientProtocolEmbeddedSelfTest.java   |   34 +
 .../hadoop/HadoopClientProtocolSelfTest.java    |  635 +++++
 .../HadoopIgfs20FileSystemAbstractSelfTest.java | 1968 +++++++++++++++
 ...Igfs20FileSystemLoopbackPrimarySelfTest.java |   74 +
 ...oopIgfs20FileSystemShmemPrimarySelfTest.java |   74 +
 .../igfs/HadoopIgfsDualAbstractSelfTest.java    |  306 +++
 .../igfs/HadoopIgfsDualAsyncSelfTest.java       |   32 +
 .../igfs/HadoopIgfsDualSyncSelfTest.java        |   32 +
 ...oopSecondaryFileSystemConfigurationTest.java |  560 +++++
 .../igfs/IgfsNearOnlyMultiNodeSelfTest.java     |  213 ++
 .../IgniteHadoopFileSystemAbstractSelfTest.java | 2367 ++++++++++++++++++
 .../IgniteHadoopFileSystemClientSelfTest.java   |  199 ++
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |  312 +++
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |  208 ++
 .../IgniteHadoopFileSystemLoggerSelfTest.java   |  288 +++
 ...niteHadoopFileSystemLoggerStateSelfTest.java |  326 +++
 ...adoopFileSystemLoopbackAbstractSelfTest.java |   48 +
 ...SystemLoopbackEmbeddedDualAsyncSelfTest.java |   33 +
 ...eSystemLoopbackEmbeddedDualSyncSelfTest.java |   33 +
 ...leSystemLoopbackEmbeddedPrimarySelfTest.java |   33 +
 ...SystemLoopbackEmbeddedSecondarySelfTest.java |   34 +
 ...SystemLoopbackExternalDualAsyncSelfTest.java |   33 +
 ...eSystemLoopbackExternalDualSyncSelfTest.java |   33 +
 ...leSystemLoopbackExternalPrimarySelfTest.java |   33 +
 ...SystemLoopbackExternalSecondarySelfTest.java |   34 +
 ...teHadoopFileSystemSecondaryModeSelfTest.java |  320 +++
 ...teHadoopFileSystemShmemAbstractSelfTest.java |   89 +
 ...ileSystemShmemEmbeddedDualAsyncSelfTest.java |   33 +
 ...FileSystemShmemEmbeddedDualSyncSelfTest.java |   33 +
 ...pFileSystemShmemEmbeddedPrimarySelfTest.java |   33 +
 ...ileSystemShmemEmbeddedSecondarySelfTest.java |   33 +
 ...ileSystemShmemExternalDualAsyncSelfTest.java |   33 +
 ...FileSystemShmemExternalDualSyncSelfTest.java |   33 +
 ...pFileSystemShmemExternalPrimarySelfTest.java |   33 +
 ...ileSystemShmemExternalSecondarySelfTest.java |   33 +
 .../ignite/testsuites/IgfsEventsTestSuite.java  |  268 ++
 .../testsuites/IgniteHadoopTestSuite.java       |    5 +-
 .../IgniteIgfsLinuxAndMacOSTestSuite.java       |    1 -
 74 files changed, 8884 insertions(+), 8874 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bae553db/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
deleted file mode 100644
index ffa20d1..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Hadoop client protocol tests in embedded process mode.
- */
-public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest {
-    /** {@inheritDoc} */
-    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
-        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
-        cfg.setExternalExecution(false);
-
-        return cfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bae553db/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
deleted file mode 100644
index d19a8ea..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ /dev/null
@@ -1,635 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.hadoop.mapreduce.protocol.*;
-import org.apache.ignite.*;
-import org.apache.ignite.hadoop.mapreduce.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop client protocol tests in external process mode.
- */
-@SuppressWarnings("ResultOfMethodCallIgnored")
-public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
-    /** Input path. */
-    private static final String PATH_INPUT = "/input";
-
-    /** Output path. */
-    private static final String PATH_OUTPUT = "/output";
-
-    /** Job name. */
-    private static final String JOB_NAME = "myJob";
-
-    /** Setup lock file. */
-    private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir")
: "/tmp",
-        "ignite-lock-setup.file");
-
-    /** Map lock file. */
-    private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir")
: "/tmp",
-        "ignite-lock-map.file");
-
-    /** Reduce lock file. */
-    private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir")
: "/tmp",
-        "ignite-lock-reduce.file");
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean igfsEnabled() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean restEnabled() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGrids(gridCount());
-
-        setupLockFile.delete();
-        mapLockFile.delete();
-        reduceLockFile.delete();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-
-//        IgniteHadoopClientProtocolProvider.cliMap.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        setupLockFile.createNewFile();
-        mapLockFile.createNewFile();
-        reduceLockFile.createNewFile();
-
-        setupLockFile.deleteOnExit();
-        mapLockFile.deleteOnExit();
-        reduceLockFile.deleteOnExit();
-
-        super.beforeTest();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format();
-
-        setupLockFile.delete();
-        mapLockFile.delete();
-        reduceLockFile.delete();
-
-        super.afterTest();
-    }
-
-    /**
-     * Test next job ID generation.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void tstNextJobId() throws Exception {
-        IgniteHadoopClientProtocolProvider provider = provider();
-
-        ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
-
-        JobID jobId = proto.getNewJobID();
-
-        assert jobId != null;
-        assert jobId.getJtIdentifier() != null;
-
-        JobID nextJobId = proto.getNewJobID();
-
-        assert nextJobId != null;
-        assert nextJobId.getJtIdentifier() != null;
-
-        assert !F.eq(jobId, nextJobId);
-    }
-
-    /**
-     * Tests job counters retrieval.
-     *
-     * @throws Exception If failed.
-     */
-    public void testJobCounters() throws Exception {
-        IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
-
-        igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
-        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
-            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
-            bw.write(
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n" +
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n" +
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n"
-            );
-        }
-
-        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
-
-        final Job job = Job.getInstance(conf);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setMapperClass(TestCountingMapper.class);
-        job.setReducerClass(TestCountingReducer.class);
-        job.setCombinerClass(TestCountingCombiner.class);
-
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
-        job.submit();
-
-        final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
-
-        assertEquals(0, cntr.getValue());
-
-        cntr.increment(10);
-
-        assertEquals(10, cntr.getValue());
-
-        // Transferring to map phase.
-        setupLockFile.delete();
-
-        // Transferring to reduce phase.
-        mapLockFile.delete();
-
-        job.waitForCompletion(false);
-
-        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
-
-        final Counters counters = job.getCounters();
-
-        assertNotNull("counters cannot be null", counters);
-        assertEquals("wrong counters count", 3, counters.countCounters());
-        assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
-    }
-
-    /**
-     * Tests job counters retrieval for unknown job id.
-     *
-     * @throws Exception If failed.
-     */
-    private void tstUnknownJobCounters() throws Exception {
-        IgniteHadoopClientProtocolProvider provider = provider();
-
-        ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
-
-        try {
-            proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
-            fail("exception must be thrown");
-        }
-        catch (Exception e) {
-            assert e instanceof IOException : "wrong error has been thrown";
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMap() throws Exception {
-        checkJobSubmit(true, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapCombine() throws Exception {
-        checkJobSubmit(false, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapReduce() throws Exception {
-        checkJobSubmit(true, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapCombineReduce() throws Exception {
-        checkJobSubmit(false, false);
-    }
-
-    /**
-     * Test job submission.
-     *
-     * @param noCombiners Whether there are no combiners.
-     * @param noReducers Whether there are no reducers.
-     * @throws Exception If failed.
-     */
-    public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception
{
-        IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
-
-        igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
-        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
-            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
-            bw.write("word");
-        }
-
-        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
-
-        final Job job = Job.getInstance(conf);
-
-        job.setJobName(JOB_NAME);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setMapperClass(TestMapper.class);
-        job.setReducerClass(TestReducer.class);
-
-        if (!noCombiners)
-            job.setCombinerClass(TestCombiner.class);
-
-        if (noReducers)
-            job.setNumReduceTasks(0);
-
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setOutputFormatClass(TestOutputFormat.class);
-
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
-        job.submit();
-
-        JobID jobId = job.getJobID();
-
-        // Setup phase.
-        JobStatus jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress()
< 1.0f;
-        assert jobStatus.getMapProgress() == 0.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
-
-        U.sleep(2100);
-
-        JobStatus recentJobStatus = job.getStatus();
-
-        assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
-            "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
-
-        // Transferring to map phase.
-        setupLockFile.delete();
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getSetupProgress());
-                }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
-                }
-            }
-        }, 5000L);
-
-        // Map phase.
-        jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress()
< 1.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
-
-        U.sleep(2100);
-
-        recentJobStatus = job.getStatus();
-
-        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
-            "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
-
-        // Transferring to reduce phase.
-        mapLockFile.delete();
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getMapProgress());
-                }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
-                }
-            }
-        }, 5000L);
-
-        if (!noReducers) {
-            // Reduce phase.
-            jobStatus = job.getStatus();
-            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-            assert jobStatus.getSetupProgress() == 1.0f;
-            assert jobStatus.getMapProgress() == 1.0f;
-            assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress()
< 1.0f;
-
-            // Ensure that reduces progress increases.
-            U.sleep(2100);
-
-            recentJobStatus = job.getStatus();
-
-            assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress()
:
-                "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
-
-            reduceLockFile.delete();
-        }
-
-        job.waitForCompletion(false);
-
-        jobStatus = job.getStatus();
-        checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() == 1.0f;
-        assert jobStatus.getReduceProgress() == 1.0f;
-
-        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
-    }
-
-    /**
-     * Dump IGFS content.
-     *
-     * @param igfs IGFS.
-     * @param path Path.
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception {
-        IgfsFile file = igfs.info(path);
-
-        assert file != null;
-
-        System.out.println(file.path());
-
-        if (file.isDirectory()) {
-            for (IgfsPath child : igfs.listPaths(path))
-                dumpIgfs(igfs, child);
-        }
-        else {
-            try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path))))
{
-                String line = br.readLine();
-
-                while (line != null) {
-                    System.out.println(line);
-
-                    line = br.readLine();
-                }
-            }
-        }
-    }
-
-    /**
-     * Check job status.
-     *
-     * @param status Job status.
-     * @param expJobId Expected job ID.
-     * @param expJobName Expected job name.
-     * @param expState Expected state.
-     * @param expCleanupProgress Expected cleanup progress.
-     * @throws Exception If failed.
-     */
-    private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
-        JobStatus.State expState, float expCleanupProgress) throws Exception {
-        assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" +
status.getJobID();
-        assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual="
+ status.getJobName();
-        assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" +
status.getState();
-        assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
-            "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
-    }
-
-    /**
-     * @return Configuration.
-     */
-    private Configuration config(int port) {
-        Configuration conf = new Configuration();
-
-        setupFileSystems(conf);
-
-        conf.set(MRConfig.FRAMEWORK_NAME, HadoopClientProtocol.FRAMEWORK_NAME);
-        conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
-
-        conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
-
-        return conf;
-    }
-
-    /**
-     * @return Protocol provider.
-     */
-    private IgniteHadoopClientProtocolProvider provider() {
-        return new IgniteHadoopClientProtocolProvider();
-    }
-
-    /**
-     * Test mapper.
-     */
-    public static class TestMapper extends Mapper<Object, Text, Text, IntWritable>
{
-        /** Writable container for writing word. */
-        private Text word = new Text();
-
-        /** Writable integer constant of '1' is writing as count of found words. */
-        private static final IntWritable one = new IntWritable(1);
-
-        /** {@inheritDoc} */
-        @Override public void map(Object key, Text val, Context ctx) throws IOException,
InterruptedException {
-            while (mapLockFile.exists())
-                Thread.sleep(50);
-
-            StringTokenizer wordList = new StringTokenizer(val.toString());
-
-            while (wordList.hasMoreTokens()) {
-                word.set(wordList.nextToken());
-
-                ctx.write(word, one);
-            }
-        }
-    }
-
-    /**
-     * Test Hadoop counters.
-     */
-    public enum TestCounter {
-        COUNTER1, COUNTER2, COUNTER3
-    }
-
-    /**
-     * Test mapper that uses counters.
-     */
-    public static class TestCountingMapper extends TestMapper {
-        /** {@inheritDoc} */
-        @Override public void map(Object key, Text val, Context ctx) throws IOException,
InterruptedException {
-            super.map(key, val, ctx);
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-        }
-    }
-
-    /**
-     * Test combiner that counts invocations.
-     */
-    public static class TestCountingCombiner extends TestReducer {
-        @Override public void reduce(Text key, Iterable<IntWritable> values,
-            Context ctx) throws IOException, InterruptedException {
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-            ctx.getCounter(TestCounter.COUNTER2).increment(1);
-
-            int sum = 0;
-            for (IntWritable value : values) {
-                sum += value.get();
-            }
-
-            ctx.write(key, new IntWritable(sum));
-        }
-    }
-
-    /**
-     * Test reducer that counts invocations.
-     */
-    public static class TestCountingReducer extends TestReducer {
-        @Override public void reduce(Text key, Iterable<IntWritable> values,
-            Context ctx) throws IOException, InterruptedException {
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-            ctx.getCounter(TestCounter.COUNTER3).increment(1);
-        }
-    }
-
-    /**
-     * Test combiner.
-     */
-    public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable>
{
-        // No-op.
-    }
-
-    public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V>
{
-        /** {@inheritDoc} */
-        @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext
ctx)
-            throws IOException {
-            return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
-        }
-    }
-
-    /**
-     * Test output committer.
-     */
-    private static class TestOutputCommitter extends FileOutputCommitter {
-        /** Delegate. */
-        private final FileOutputCommitter delegate;
-
-        /**
-         * Constructor.
-         *
-         * @param ctx Task attempt context.
-         * @param delegate Delegate.
-         * @throws IOException If failed.
-         */
-        private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate)
throws IOException {
-            super(FileOutputFormat.getOutputPath(ctx), ctx);
-
-            this.delegate = delegate;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setupJob(JobContext jobCtx) throws IOException {
-            try {
-                while (setupLockFile.exists())
-                    Thread.sleep(50);
-            }
-            catch (InterruptedException ignored) {
-                throw new IOException("Interrupted.");
-            }
-
-            delegate.setupJob(jobCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.setupTask(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException
{
-            return delegate.needsTaskCommit(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.commitTask(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.abortTask(taskCtx);
-        }
-    }
-
-    /**
-     * Test reducer.
-     */
-    public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
-        /** Writable container for writing sum of word counts. */
-        private IntWritable totalWordCnt = new IntWritable();
-
-        /** {@inheritDoc} */
-        @Override public void reduce(Text key, Iterable<IntWritable> values, Context
ctx) throws IOException,
-            InterruptedException {
-            while (reduceLockFile.exists())
-                Thread.sleep(50);
-
-            int wordCnt = 0;
-
-            for (IntWritable value : values)
-                wordCnt += value.get();
-
-            totalWordCnt.set(wordCnt);
-
-            ctx.write(key, totalWordCnt);
-        }
-    }
-}


Mime
View raw message