ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [35/46] ignite git commit: IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.
Date Wed, 20 Jul 2016 09:07:20 GMT
IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecc77b01
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecc77b01
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecc77b01

Branch: refs/heads/ignite-1232
Commit: ecc77b0180b3b16303988e1dc187b53e7b96a111
Parents: 1dc8bcc
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 19 16:09:06 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 19 16:09:06 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsIgniteMock.java         |   9 +
 .../hadoop/HadoopAbstractMapReduceTest.java     |  15 +
 .../processors/hadoop/HadoopMapReduceTest.java  | 395 +------------------
 3 files changed, 42 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index 0c55595..c9f77cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteQueue;
@@ -443,6 +444,14 @@ public class IgfsIgniteMock implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean
fair, boolean create)
+        throws IgniteException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> IgniteQueue<T> queue(String name, int cap, @Nullable
CollectionConfiguration cfg)
         throws IgniteException {
         throwUnsupported();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
index d09ec61..ef886e4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
@@ -389,9 +390,23 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest
{
         cfg.setLocalHost("127.0.0.1");
         cfg.setConnectorConfiguration(null);
 
+        HadoopConfiguration hadoopCfg = createHadoopConfiguration();
+
+        if (hadoopCfg != null)
+            cfg.setHadoopConfiguration(hadoopCfg);
+
         return G.start(cfg);
     }
 
+    /**
+     * Creates custom Hadoop configuration.
+     *
+     * @return The Hadoop configuration.
+     */
+    protected HadoopConfiguration createHadoopConfiguration() {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration fsCfg = super.igfsConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 5d1de38..b703896 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -17,130 +17,13 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
-import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
-import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
 import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
-public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
-    /** IGFS block size. */
-    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
-
-    /** Amount of blocks to prefetch. */
-    protected static final int PREFETCH_BLOCKS = 1;
-
-    /** Amount of sequential block reads before prefetch is triggered. */
-    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
-
-    /** Secondary file system URI. */
-    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
-
-    /** Secondary file system configuration path. */
-    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
-
-    /** The user to run Hadoop job on behalf of. */
-    protected static final String USER = "vasya";
-
-    /** Secondary IGFS name. */
-    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
-
-    /** The secondary Ignite node. */
-    protected Ignite igniteSecondary;
-
-    /** The secondary Fs. */
-    protected IgfsSecondaryFileSystem secondaryFs;
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    /**
-     * Gets owner of a IgfsEx path.
-     * @param p The path.
-     * @return The owner.
-     */
-    private static String getOwner(IgfsEx i, IgfsPath p) {
-        return i.info(p).property(IgfsUtils.PROP_USER_NAME);
-    }
-
-    /**
-     * Gets owner of a secondary Fs path.
-     * @param secFs The sec Fs.
-     * @param p The path.
-     * @return The owner.
-     */
-    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath
p) {
-        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
-            @Override public String apply() {
-                return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
-            }
-        });
-    }
-
-    /**
-     * Checks owner of the path.
-     * @param p The path.
-     */
-    private void checkOwner(IgfsPath p) {
-        String ownerPrim = getOwner(igfs, p);
-        assertEquals(USER, ownerPrim);
-
-        String ownerSec = getOwnerSecondary(secondaryFs, p);
-        assertEquals(USER, ownerSec);
-    }
-
+public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
     /**
      * Tests whole job execution with all phases in all combination of new and old versions
of API.
      * @throws Exception If fails.
@@ -152,274 +35,32 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest
{
 
         IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
 
-        final int red = 10_000;
-        final int blue = 20_000;
-        final int green = 15_000;
-        final int yellow = 7_000;
-
         generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow",
yellow );
 
-        for (int i = 0; i < 3; i++) {
-            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
-
-            boolean useNewMapper = (i & 1) == 0;
-            boolean useNewCombiner = (i & 2) == 0;
-            boolean useNewReducer = (i & 4) == 0;
-
-            JobConf jobConf = new JobConf();
+        for (boolean[] apiMode: getApiModes()) {
+            assert apiMode.length == 3;
 
-            jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser(USER);
-            jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY,
"/xxx/${USER}/zzz");
+            boolean useNewMapper = apiMode[0];
+            boolean useNewCombiner = apiMode[1];
+            boolean useNewReducer = apiMode[2];
 
-            //To split into about 40 items for v2
-            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
-
-            //For v1
-            jobConf.setInt("fs.local.block.size", 65000);
-
-            // File system coordinates.
-            setupFileSystems(jobConf);
-
-            HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
-
-            Job job = Job.getInstance(jobConf);
-
-            HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer,
compressOutputSnappy());
-
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(IntWritable.class);
-
-            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
-            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
-
-            job.setJarByClass(HadoopWordCount2.class);
-
-            HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
-
-            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
-
-            fut.get();
-
-            checkJobStatistics(jobId);
-
-            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-")
+ "00000";
-
-            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
-
-            checkOwner(new IgfsPath(outFile));
-
-            String actual = readAndSortFile(outFile, job.getConfiguration());
-
-            assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner
+ ", new reducer: " +
-                useNewReducer,
-                "blue\t" + blue + "\n" +
-                "green\t" + green + "\n" +
-                "red\t" + red + "\n" +
-                "yellow\t" + yellow + "\n",
-                actual
-            );
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
         }
     }
 
     /**
-     * Gets if to compress output data with Snappy.
-     *
-     * @return If to compress output data with Snappy.
-     */
-    protected boolean compressOutputSnappy() {
-        return false;
-    }
-
-    /**
-     * Simple test job statistics.
+     * Gets API mode combinations to be tested.
+     * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
      *
-     * @param jobId Job id.
-     * @throws IgniteCheckedException
+     * @return Arrays of booleans indicating API combinations to test.
      */
-    private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException
{
-        HadoopCounters cntrs = grid(0).hadoop().counters(jobId);
-
-        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
-
-        Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
-
-        Map<String, Integer> phaseOrders = new HashMap<>();
-        phaseOrders.put("submit", 0);
-        phaseOrders.put("prepare", 1);
-        phaseOrders.put("start", 2);
-        phaseOrders.put("Cstart", 3);
-        phaseOrders.put("finish", 4);
-
-        String prevTaskId = null;
-
-        long apiEvtCnt = 0;
-
-        for (T2<String, Long> evt : perfCntr.evts()) {
-            //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
-            String[] parsedEvt = evt.get1().split(" ");
-
-            String taskId;
-            String taskPhase;
-
-            if ("JOB".equals(parsedEvt[0])) {
-                taskId = parsedEvt[0];
-                taskPhase = parsedEvt[1];
-            }
-            else {
-                taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0,
3)) + parsedEvt[1];
-                taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
-            }
-
-            if (!taskId.equals(prevTaskId))
-                tasks.put(taskId, new TreeMap<Integer,Long>());
-
-            Integer pos = phaseOrders.get(taskPhase);
-
-            assertNotNull("Invalid phase " + taskPhase, pos);
-
-            tasks.get(taskId).put(pos, evt.get2());
-
-            prevTaskId = taskId;
-
-            apiEvtCnt++;
-        }
-
-        for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet())
{
-            Map<Integer, Long> order = task.getValue();
-
-            long prev = 0;
-
-            for (Map.Entry<Integer, Long> phase : order.entrySet()) {
-                assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue()
>= prev);
-
-                prev = phase.getValue();
-            }
-        }
-
-        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return igfs.exists(statPath);
-            }
-        }, 20_000);
-
-        final long apiEvtCnt0 = apiEvtCnt;
-
-        boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))))
{
-                        return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader);
-                    }
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }, 10000);
-
-        if (!res) {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
-
-            assert false : "Invalid API events count [exp=" + apiEvtCnt0 +
-                ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY,
null, SECONDARY_REST_CFG);
-
-        super.beforeTest();
-    }
-
-    /**
-     * Start grid with IGFS.
-     *
-     * @param gridName Grid name.
-     * @param igfsName IGFS name
-     * @param mode IGFS mode.
-     * @param secondaryFs Secondary file system (optional).
-     * @param restCfg Rest configuration string (optional).
-     * @return Started grid instance.
-     * @throws Exception If failed.
-     */
-    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
-        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration
restCfg) throws Exception {
-        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
-        igfsCfg.setDataCacheName("dataCache");
-        igfsCfg.setMetaCacheName("metaCache");
-        igfsCfg.setName(igfsName);
-        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
-        igfsCfg.setDefaultMode(mode);
-        igfsCfg.setIpcEndpointConfiguration(restCfg);
-        igfsCfg.setSecondaryFileSystem(secondaryFs);
-        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
-        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
-
-        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
-        dataCacheCfg.setName("dataCache");
-        dataCacheCfg.setCacheMode(PARTITIONED);
-        dataCacheCfg.setNearConfiguration(null);
-        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
-        dataCacheCfg.setBackups(0);
-        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
-        dataCacheCfg.setOffHeapMaxMemory(0);
-
-        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
-        metaCacheCfg.setName("metaCache");
-        metaCacheCfg.setCacheMode(REPLICATED);
-        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setGridName(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        cfg.setDiscoverySpi(discoSpi);
-        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
-        cfg.setFileSystemConfiguration(igfsCfg);
-
-        cfg.setLocalHost("127.0.0.1");
-        cfg.setConnectorConfiguration(null);
-
-        HadoopConfiguration hadoopCfg = createHadoopConfiguration();
-
-        if (hadoopCfg != null)
-            cfg.setHadoopConfiguration(hadoopCfg);
-
-        return G.start(cfg);
-    }
-
-    /**
-     * Creates custom Hadoop configuration.
-     *
-     * @return The Hadoop configuration.
-     */
-    protected HadoopConfiguration createHadoopConfiguration() {
-        return null;
-    }
-
-    /**
-     * @return IGFS configuration.
-     */
-    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
-        FileSystemConfiguration fsCfg = super.igfsConfiguration();
-
-        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
-
-        fsCfg.setSecondaryFileSystem(secondaryFs);
-
-        return fsCfg;
+    protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, false },
+            { false, false, true },
+            { false, true,  false },
+            { true,  false, false },
+            { true,  true,  true },
+        };
     }
 }
\ No newline at end of file


Mime
View raw message