Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC49B200BB4 for ; Mon, 17 Oct 2016 19:59:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DABCE160AFE; Mon, 17 Oct 2016 17:59:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84FCA160B04 for ; Mon, 17 Oct 2016 19:59:01 +0200 (CEST) Received: (qmail 40609 invoked by uid 500); 17 Oct 2016 17:59:00 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38927 invoked by uid 99); 17 Oct 2016 17:58:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 17:58:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38F6BE390E; Mon, 17 Oct 2016 17:58:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: av@apache.org To: commits@ignite.apache.org Date: Mon, 17 Oct 2016 17:59:23 -0000 Message-Id: <5c3f9e4a6e034dbcb4bcc95062fbba66@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/40] ignite git commit: IGNITE-2355: Hadoop: added ability to configure multiple job tracker addresses. This closes #1153. archived-at: Mon, 17 Oct 2016 17:59:03 -0000 IGNITE-2355: Hadoop: added ability to configure multiple job tracker addresses. This closes #1153. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ab094e0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ab094e0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ab094e0 Branch: refs/heads/ignite-ssl-hotfix Commit: 2ab094e08373dc6667af44d48a38b4f044953a79 Parents: f597aff Author: tledkov-gridgain Authored: Wed Oct 12 16:48:51 2016 +0300 Committer: vozerov-gridgain Committed: Wed Oct 12 16:48:51 2016 +0300 ---------------------------------------------------------------------- .../ignite/testframework/GridTestUtils.java | 37 +++ .../IgniteHadoopClientProtocolProvider.java | 53 ++-- .../hadoop/impl/HadoopAbstractSelfTest.java | 4 +- ...opClientProtocolMultipleServersSelfTest.java | 307 +++++++++++++++++++ .../testsuites/IgniteHadoopTestSuite.java | 2 + 5 files changed, 381 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab094e0/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 524c643..b3ce46b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -264,6 +264,43 @@ public final class GridTestUtils { } /** + * Checks whether callable throws an exception with specified cause. + * + * @param log Logger (optional). + * @param call Callable. + * @param cls Exception class. + * @param msg Exception message (optional). If provided exception message + * and this message should be equal. + * @return Thrown throwable. + */ + public static Throwable assertThrowsAnyCause(@Nullable IgniteLogger log, Callable call, + Class cls, @Nullable String msg) { + assert call != null; + assert cls != null; + + try { + call.call(); + } + catch (Throwable e) { + Throwable t = e; + + while (t != null) { + if (cls == t.getClass() && (msg == null || (t.getMessage() != null || t.getMessage().contains(msg)))) { + log.info("Caught expected exception: " + t.getMessage()); + + return t; + } + + t = t.getCause(); + } + + fail("Unexpected exception", e); + } + + throw new AssertionError("Exception has not been thrown."); + } + + /** * Checks whether callable throws expected exception or its child or not. * * @param log Logger (optional). http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab094e0/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java index 343b5ed..1efe625 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java @@ -17,11 +17,18 @@ package org.apache.ignite.hadoop.mapreduce; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientConfiguration; @@ -32,11 +39,6 @@ import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProto import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; - import static org.apache.ignite.internal.client.GridClientProtocol.TCP; @@ -53,17 +55,27 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { /** {@inheritDoc} */ @Override public ClientProtocol create(Configuration conf) throws IOException { if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { - String addr = conf.get(MRConfig.MASTER_ADDRESS); + Collection addrs = conf.getTrimmedStringCollection(MRConfig.MASTER_ADDRESS); - if (F.isEmpty(addr)) - throw new IOException("Failed to create client protocol because server address is not specified (is " + - MRConfig.MASTER_ADDRESS + " property set?)."); + if (F.isEmpty(addrs)) + throw new IOException("Failed to create client protocol because Ignite node addresses are not " + + "specified (did you set " + MRConfig.MASTER_ADDRESS + " property?)."); - if (F.eq(addr, "local")) + if (F.contains(addrs, "local")) throw new IOException("Local execution mode is not supported, please point " + - MRConfig.MASTER_ADDRESS + " to real Ignite node."); + MRConfig.MASTER_ADDRESS + " to real Ignite nodes."); + + Collection addrs0 = new ArrayList<>(addrs.size()); + + // Set up port by default if need + for (String addr : addrs) { + if (!addr.contains(":")) + addrs0.add(addr + ':' + ConnectorConfiguration.DFLT_TCP_PORT); + else + addrs0.add(addr); + } - return createProtocol(addr, conf); + return new HadoopClientProtocol(conf, client(conf.get(MRConfig.MASTER_ADDRESS), addrs0)); } return null; @@ -91,24 +103,25 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { * @throws IOException If failed. */ private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { - return new HadoopClientProtocol(conf, client(addr)); + return new HadoopClientProtocol(conf, client(addr, Collections.singletonList(addr))); } /** * Create client. * - * @param addr Endpoint address. + * @param clusterName Ignite cluster logical name. + * @param addrs Endpoint addresses. * @return Client. * @throws IOException If failed. */ - private static GridClient client(String addr) throws IOException { + private static GridClient client(String clusterName, Collection addrs) throws IOException { try { - IgniteInternalFuture fut = cliMap.get(addr); + IgniteInternalFuture fut = cliMap.get(clusterName); if (fut == null) { GridFutureAdapter fut0 = new GridFutureAdapter<>(); - IgniteInternalFuture oldFut = cliMap.putIfAbsent(addr, fut0); + IgniteInternalFuture oldFut = cliMap.putIfAbsent(clusterName, fut0); if (oldFut != null) return oldFut.get(); @@ -116,7 +129,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { GridClientConfiguration cliCfg = new GridClientConfiguration(); cliCfg.setProtocol(TCP); - cliCfg.setServers(Collections.singletonList(addr)); + cliCfg.setServers(addrs); cliCfg.setMarshaller(new GridClientJdkMarshaller()); cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day. cliCfg.setDaemon(true); @@ -131,7 +144,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { catch (GridClientException e) { fut0.onDone(e); - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + throw new IOException("Failed to establish connection with Ignite: " + addrs, e); } } } @@ -139,7 +152,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { return fut.get(); } catch (IgniteCheckedException e) { - throw new IOException("Failed to establish connection with Ignite node: " + addr, e); + throw new IOException("Failed to establish connection with Ignite сдгые: " + addrs, e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab094e0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java index 68009dd..12351c6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java @@ -48,7 +48,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** REST port. */ - protected static final int REST_PORT = 11212; + protected static final int REST_PORT = ConnectorConfiguration.DFLT_TCP_PORT; /** IGFS name. */ protected static final String igfsName = null; @@ -185,7 +185,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS data cache configuration. */ - private CacheConfiguration dataCacheConfiguration() { + protected CacheConfiguration dataCacheConfiguration() { CacheConfiguration cfg = new CacheConfiguration(); cfg.setName(igfsDataCacheName); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab094e0/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java new file mode 100644 index 0000000..04747d0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.client; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.Callable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.client.GridServerUnreachableException; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Hadoop client protocol configured with multiple ignite servers tests. + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractSelfTest { + /** Input path. */ + private static final String PATH_INPUT = "/input"; + + /** Job name. */ + private static final String JOB_NAME = "myJob"; + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean restEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGrids(gridCount()); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration dataCacheConfiguration() { + CacheConfiguration cfg = super.dataCacheConfiguration(); + + cfg.setBackups(1); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + private void beforeJob() throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); + + igfs.format(); + + igfs.mkdirs(new IgfsPath(PATH_INPUT)); + + try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( + new IgfsPath(PATH_INPUT + "/test.file"), true)))) { + + bw.write("word"); + } + } + + /** + * Test job submission. + * + * @param conf Hadoop configuration. + * @throws Exception If failed. + */ + public void checkJobSubmit(Configuration conf) throws Exception { + final Job job = Job.getInstance(conf); + + job.setJobName(JOB_NAME); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(OutFormat.class); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + + job.setNumReduceTasks(0); + + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + + job.submit(); + + job.waitForCompletion(false); + + assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testMultipleAddresses() throws Exception { + beforeJob(); + + stopGrid(0); + + U.sleep(5000); + + checkJobSubmit(configMultipleAddrs(gridCount())); + + startGrid(0); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) + public void testSingleAddress() throws Exception { + stopGrid(0); + + U.sleep(5000); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Object call() throws Exception { + checkJobSubmit(configSingleAddress()); + return null; + } + }, + GridServerUnreachableException.class, "Failed to connect to any of the servers in list"); + + startGrid(0); + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + public void testMixedAddrs() throws Exception { + beforeJob(); + + stopGrid(1); + + U.sleep(5000); + + checkJobSubmit(configMixed()); + + startGrid(1); + + awaitPartitionMapExchange(); + } + + /** + * @return Configuration. + */ + private Configuration configSingleAddress() { + Configuration conf = HadoopUtils.safeCreateConfiguration(); + + setupFileSystems(conf); + + conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME); + conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + REST_PORT); + + conf.set("fs.defaultFS", "igfs:///"); + + return conf; + } + + /** + * @param serversCnt Count ov servers. + * @return Configuration. + */ + private Configuration configMultipleAddrs(int serversCnt) { + Configuration conf = HadoopUtils.safeCreateConfiguration(); + + setupFileSystems(conf); + + conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME); + + Collection addrs = new ArrayList<>(serversCnt); + + for (int i = 0; i < serversCnt; ++i) + addrs.add("127.0.0.1:" + Integer.toString(REST_PORT + i)); + + conf.set(MRConfig.MASTER_ADDRESS, F.concat(addrs, ",")); + + conf.set("fs.defaultFS", "igfs:///"); + + return conf; + } + + /** + * @return Configuration. + */ + private Configuration configMixed() { + Configuration conf = HadoopUtils.safeCreateConfiguration(); + + setupFileSystems(conf); + + conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME); + + Collection addrs = new ArrayList<>(); + + addrs.add("localhost"); + addrs.add("127.0.0.1:" + Integer.toString(REST_PORT + 1)); + + conf.set(MRConfig.MASTER_ADDRESS, F.concat(addrs, ",")); + + conf.set("fs.defaultFS", "igfs:///"); + + return conf; + } + + /** + * Test mapper. + */ + public static class TestMapper extends Mapper { + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + // No-op. + } + } + + /** + * Test reducer. + */ + public static class TestReducer extends Reducer { + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable values, Context ctx) throws IOException, + InterruptedException { + // No-op. + } + } + + /** + * Test output formatter. + */ + public static class OutFormat extends OutputFormat { + /** {@inheritDoc} */ + @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2ab094e0/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index bbd92d1..959bc59 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -24,6 +24,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.hadoop.HadoopTestClassLoader; import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolEmbeddedSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolMultipleServersSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopTxConfigCacheTest; import org.apache.ignite.internal.processors.hadoop.impl.fs.KerberosHadoopFileSystemFactorySelfTest; @@ -194,6 +195,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolEmbeddedSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolMultipleServersSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopCommandLineTest.class.getName())));