Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ED5B0200B5A for ; Wed, 20 Jul 2016 11:06:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EBFCF160A64; Wed, 20 Jul 2016 09:06:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96FF4160A92 for ; Wed, 20 Jul 2016 11:06:53 +0200 (CEST) Received: (qmail 83403 invoked by uid 500); 20 Jul 2016 09:06:47 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 82111 invoked by uid 99); 20 Jul 2016 09:06:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jul 2016 09:06:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 09705E0C0A; Wed, 20 Jul 2016 09:06:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 20 Jul 2016 09:07:20 -0000 Message-Id: <7255eefd6224489e92e2a1e1ebc415ed@git.apache.org> In-Reply-To: <8299c2c7c81f46e8b20ca1142fd66bc0@git.apache.org> References: <8299c2c7c81f46e8b20ca1142fd66bc0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/46] ignite git commit: IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner. archived-at: Wed, 20 Jul 2016 09:06:55 -0000 IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecc77b01 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecc77b01 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecc77b01 Branch: refs/heads/ignite-1232 Commit: ecc77b0180b3b16303988e1dc187b53e7b96a111 Parents: 1dc8bcc Author: vozerov-gridgain Authored: Tue Jul 19 16:09:06 2016 +0300 Committer: vozerov-gridgain Committed: Tue Jul 19 16:09:06 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsIgniteMock.java | 9 + .../hadoop/HadoopAbstractMapReduceTest.java | 15 + .../processors/hadoop/HadoopMapReduceTest.java | 395 +------------------ 3 files changed, 42 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java index 0c55595..c9f77cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; @@ -443,6 +444,14 @@ public class IgfsIgniteMock implements IgniteEx { } /** {@inheritDoc} */ + @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean fair, boolean create) + throws IgniteException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ @Override public IgniteQueue queue(String name, int cap, @Nullable CollectionConfiguration cfg) throws IgniteException { throwUnsupported(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java index d09ec61..ef886e4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.HadoopConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; @@ -389,9 +390,23 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); + HadoopConfiguration hadoopCfg = createHadoopConfiguration(); + + if (hadoopCfg != null) + cfg.setHadoopConfiguration(hadoopCfg); + return G.start(cfg); } + /** + * Creates custom Hadoop configuration. + * + * @return The Hadoop configuration. + */ + protected HadoopConfiguration createHadoopConfiguration() { + return null; + } + /** {@inheritDoc} */ @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration fsCfg = super.igfsConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ecc77b01/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 5d1de38..b703896 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -17,130 +17,13 @@ package org.apache.ignite.internal.processors.hadoop; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; /** * Test of whole cycle of map-reduce processing via Job tracker. */ -public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { - /** IGFS block size. */ - protected static final int IGFS_BLOCK_SIZE = 512 * 1024; - - /** Amount of blocks to prefetch. */ - protected static final int PREFETCH_BLOCKS = 1; - - /** Amount of sequential block reads before prefetch is triggered. */ - protected static final int SEQ_READS_BEFORE_PREFETCH = 2; - - /** Secondary file system URI. */ - protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; - - /** Secondary file system configuration path. */ - protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; - - /** The user to run Hadoop job on behalf of. */ - protected static final String USER = "vasya"; - - /** Secondary IGFS name. */ - protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; - - /** The secondary Ignite node. */ - protected Ignite igniteSecondary; - - /** The secondary Fs. */ - protected IgfsSecondaryFileSystem secondaryFs; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * Gets owner of a IgfsEx path. - * @param p The path. - * @return The owner. - */ - private static String getOwner(IgfsEx i, IgfsPath p) { - return i.info(p).property(IgfsUtils.PROP_USER_NAME); - } - - /** - * Gets owner of a secondary Fs path. - * @param secFs The sec Fs. - * @param p The path. - * @return The owner. - */ - private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { - return IgfsUserContext.doAs(USER, new IgniteOutClosure() { - @Override public String apply() { - return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); - } - }); - } - - /** - * Checks owner of the path. - * @param p The path. - */ - private void checkOwner(IgfsPath p) { - String ownerPrim = getOwner(igfs, p); - assertEquals(USER, ownerPrim); - - String ownerSec = getOwnerSecondary(secondaryFs, p); - assertEquals(USER, ownerSec); - } - +public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. @@ -152,274 +35,32 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - final int red = 10_000; - final int blue = 20_000; - final int green = 15_000; - final int yellow = 7_000; - generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); - for (int i = 0; i < 3; i++) { - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - boolean useNewMapper = (i & 1) == 0; - boolean useNewCombiner = (i & 2) == 0; - boolean useNewReducer = (i & 4) == 0; - - JobConf jobConf = new JobConf(); + for (boolean[] apiMode: getApiModes()) { + assert apiMode.length == 3; - jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser(USER); - jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); + boolean useNewMapper = apiMode[0]; + boolean useNewCombiner = apiMode[1]; + boolean useNewReducer = apiMode[2]; - //To split into about 40 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); - - Job job = Job.getInstance(jobConf); - - HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setJarByClass(HadoopWordCount2.class); - - HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - - IgniteInternalFuture fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - checkJobStatistics(jobId); - - final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; - - checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); - - checkOwner(new IgfsPath(outFile)); - - String actual = readAndSortFile(outFile, job.getConfiguration()); - - assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, - "blue\t" + blue + "\n" + - "green\t" + green + "\n" + - "red\t" + red + "\n" + - "yellow\t" + yellow + "\n", - actual - ); + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); } } /** - * Gets if to compress output data with Snappy. - * - * @return If to compress output data with Snappy. - */ - protected boolean compressOutputSnappy() { - return false; - } - - /** - * Simple test job statistics. + * Gets API mode combinations to be tested. + * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. * - * @param jobId Job id. - * @throws IgniteCheckedException + * @return Arrays of booleans indicating API combinations to test. */ - private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { - HadoopCounters cntrs = grid(0).hadoop().counters(jobId); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - Map> tasks = new TreeMap<>(); - - Map phaseOrders = new HashMap<>(); - phaseOrders.put("submit", 0); - phaseOrders.put("prepare", 1); - phaseOrders.put("start", 2); - phaseOrders.put("Cstart", 3); - phaseOrders.put("finish", 4); - - String prevTaskId = null; - - long apiEvtCnt = 0; - - for (T2 evt : perfCntr.evts()) { - //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 - String[] parsedEvt = evt.get1().split(" "); - - String taskId; - String taskPhase; - - if ("JOB".equals(parsedEvt[0])) { - taskId = parsedEvt[0]; - taskPhase = parsedEvt[1]; - } - else { - taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; - taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; - } - - if (!taskId.equals(prevTaskId)) - tasks.put(taskId, new TreeMap()); - - Integer pos = phaseOrders.get(taskPhase); - - assertNotNull("Invalid phase " + taskPhase, pos); - - tasks.get(taskId).put(pos, evt.get2()); - - prevTaskId = taskId; - - apiEvtCnt++; - } - - for (Map.Entry> task : tasks.entrySet()) { - Map order = task.getValue(); - - long prev = 0; - - for (Map.Entry phase : order.entrySet()) { - assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); - - prev = phase.getValue(); - } - } - - final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return igfs.exists(statPath); - } - }, 20_000); - - final long apiEvtCnt0 = apiEvtCnt; - - boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { - return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }, 10000); - - if (!res) { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); - - assert false : "Invalid API events count [exp=" + apiEvtCnt0 + - ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; - } - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); - - super.beforeTest(); - } - - /** - * Start grid with IGFS. - * - * @param gridName Grid name. - * @param igfsName IGFS name - * @param mode IGFS mode. - * @param secondaryFs Secondary file system (optional). - * @param restCfg Rest configuration string (optional). - * @return Started grid instance. - * @throws Exception If failed. - */ - protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("dataCache"); - igfsCfg.setMetaCacheName("metaCache"); - igfsCfg.setName(igfsName); - igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); - igfsCfg.setDefaultMode(mode); - igfsCfg.setIpcEndpointConfiguration(restCfg); - igfsCfg.setSecondaryFileSystem(secondaryFs); - igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); - igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setNearConfiguration(null); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setOffHeapMaxMemory(0); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - HadoopConfiguration hadoopCfg = createHadoopConfiguration(); - - if (hadoopCfg != null) - cfg.setHadoopConfiguration(hadoopCfg); - - return G.start(cfg); - } - - /** - * Creates custom Hadoop configuration. - * - * @return The Hadoop configuration. - */ - protected HadoopConfiguration createHadoopConfiguration() { - return null; - } - - /** - * @return IGFS configuration. - */ - @Override public FileSystemConfiguration igfsConfiguration() throws Exception { - FileSystemConfiguration fsCfg = super.igfsConfiguration(); - - secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); - - fsCfg.setSecondaryFileSystem(secondaryFs); - - return fsCfg; + protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, false }, + { false, false, true }, + { false, true, false }, + { true, false, false }, + { true, true, true }, + }; } } \ No newline at end of file