Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5DFE7200B96 for ; Wed, 21 Sep 2016 16:53:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5C9BB160ADB; Wed, 21 Sep 2016 14:53:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69699160B1D for ; Wed, 21 Sep 2016 16:52:59 +0200 (CEST) Received: (qmail 10081 invoked by uid 500); 21 Sep 2016 14:52:58 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 8872 invoked by uid 99); 21 Sep 2016 14:52:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2016 14:52:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16292E69B1; Wed, 21 Sep 2016 14:52:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 21 Sep 2016 14:54:22 -0000 Message-Id: <35b7bd712d1542108d4f05e98807ce79@git.apache.org> In-Reply-To: <07c11c42090e46f6bdd8ca62f593b415@git.apache.org> References: <07c11c42090e46f6bdd8ca62f593b415@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [90/92] [abbrv] ignite git commit: Moved another big part of logic from "hadoop-impl" to "hadoop". archived-at: Wed, 21 Sep 2016 14:53:07 -0000 Moved another big part of logic from "hadoop-impl" to "hadoop". Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53237dd3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53237dd3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53237dd3 Branch: refs/heads/ignite-3949 Commit: 53237dd3ee2957f95ca3f9674a6488bc9d02b85b Parents: 8607846 Author: vozerov-gridgain Authored: Wed Sep 21 17:29:44 2016 +0300 Committer: vozerov-gridgain Committed: Wed Sep 21 17:29:44 2016 +0300 ---------------------------------------------------------------------- bin/setup-hadoop.bat | 2 +- bin/setup-hadoop.sh | 2 +- .../ignite/internal/IgniteComponentType.java | 2 +- .../hadoop/impl/HadoopAttributes.java | 168 ------ .../processors/hadoop/impl/HadoopImpl.java | 138 ----- .../processors/hadoop/impl/HadoopProcessor.java | 242 --------- .../processors/hadoop/impl/HadoopSetup.java | 542 ------------------- .../processors/hadoop/HadoopAttributes.java | 168 ++++++ .../internal/processors/hadoop/HadoopImpl.java | 134 +++++ .../processors/hadoop/HadoopProcessor.java | 230 ++++++++ .../internal/processors/hadoop/HadoopSetup.java | 542 +++++++++++++++++++ 11 files changed, 1077 insertions(+), 1093 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/bin/setup-hadoop.bat ---------------------------------------------------------------------- diff --git a/bin/setup-hadoop.bat b/bin/setup-hadoop.bat index 198a1b0..a11ef8c 100644 --- a/bin/setup-hadoop.bat +++ b/bin/setup-hadoop.bat @@ -23,6 +23,6 @@ if "%OS%" == "Windows_NT" setlocal -set MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.impl.HadoopSetup +set MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup call "%~dp0\ignite.bat" %* http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/bin/setup-hadoop.sh ---------------------------------------------------------------------- diff --git a/bin/setup-hadoop.sh b/bin/setup-hadoop.sh index 3188e4d..8870c75 100755 --- a/bin/setup-hadoop.sh +++ b/bin/setup-hadoop.sh @@ -55,7 +55,7 @@ setIgniteHome # # Set utility environment. # -export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.impl.HadoopSetup +export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup # # Start utility. http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 0aabdf4..fa5240e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -37,7 +37,7 @@ public enum IgniteComponentType { /** Hadoop. */ HADOOP( "org.apache.ignite.internal.processors.hadoop.HadoopNoopProcessor", - "org.apache.ignite.internal.processors.hadoop.impl.HadoopProcessor", + "org.apache.ignite.internal.processors.hadoop.HadoopProcessor", "ignite-hadoop-impl" ), http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java deleted file mode 100644 index 23eaa18..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Arrays; - -/** - * Hadoop attributes. - */ -public class HadoopAttributes implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Attribute name. */ - public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop"; - - /** Map-reduce planner class name. */ - private String plannerCls; - - /** External executor flag. */ - private boolean extExec; - - /** Maximum parallel tasks. */ - private int maxParallelTasks; - - /** Maximum task queue size. */ - private int maxTaskQueueSize; - - /** Library names. */ - @GridToStringExclude - private String[] libNames; - - /** Number of cores. */ - private int cores; - - /** - * Get attributes for node (if any). - * - * @param node Node. - * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node. - */ - @Nullable public static HadoopAttributes forNode(ClusterNode node) { - return node.attribute(NAME); - } - - /** - * {@link Externalizable} support. - */ - public HadoopAttributes() { - // No-op. - } - - /** - * Constructor. - * - * @param cfg Configuration. - */ - public HadoopAttributes(HadoopConfiguration cfg) { - assert cfg != null; - assert cfg.getMapReducePlanner() != null; - - plannerCls = cfg.getMapReducePlanner().getClass().getName(); - - // TODO: IGNITE-404: Get from configuration when fixed. - extExec = false; - - maxParallelTasks = cfg.getMaxParallelTasks(); - maxTaskQueueSize = cfg.getMaxTaskQueueSize(); - libNames = cfg.getNativeLibraryNames(); - - // Cores count already passed in other attributes, we add it here for convenience. - cores = Runtime.getRuntime().availableProcessors(); - } - - /** - * @return Map reduce planner class name. - */ - public String plannerClassName() { - return plannerCls; - } - - /** - * @return External execution flag. - */ - public boolean externalExecution() { - return extExec; - } - - /** - * @return Maximum parallel tasks. - */ - public int maxParallelTasks() { - return maxParallelTasks; - } - - /** - * @return Maximum task queue size. - */ - public int maxTaskQueueSize() { - return maxTaskQueueSize; - } - - - /** - * @return Native library names. - */ - public String[] nativeLibraryNames() { - return libNames; - } - - /** - * @return Number of cores on machine. - */ - public int cores() { - return cores; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(plannerCls); - out.writeBoolean(extExec); - out.writeInt(maxParallelTasks); - out.writeInt(maxTaskQueueSize); - out.writeObject(libNames); - out.writeInt(cores); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - plannerCls = (String)in.readObject(); - extExec = in.readBoolean(); - maxParallelTasks = in.readInt(); - maxTaskQueueSize = in.readInt(); - libNames = (String[])in.readObject(); - cores = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java deleted file mode 100644 index 80309df..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop facade implementation. - */ -public class HadoopImpl implements Hadoop { - /** Hadoop processor. */ - private final HadoopProcessor proc; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** - * Constructor. - * - * @param proc Hadoop processor. - */ - HadoopImpl(HadoopProcessor proc) { - this.proc = proc; - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration configuration() { - return proc.config(); - } - - /** {@inheritDoc} */ - @Override public HadoopJobId nextJobId() { - if (busyLock.enterBusy()) { - try { - return proc.nextJobId(); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { - if (busyLock.enterBusy()) { - try { - return proc.submit(jobId, jobInfo); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to submit job (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.status(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job status (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.counters(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job counters (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.finishFuture(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.kill(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to kill job (grid is stopping)."); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java deleted file mode 100644 index a77e918..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopAttributes; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; -import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils; -import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; -import org.apache.ignite.internal.processors.hadoop.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.HadoopLocations; -import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -import java.io.IOException; -import java.util.List; -import java.util.ListIterator; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Hadoop processor. - */ -public class HadoopProcessor extends HadoopProcessorAdapter { - /** Job ID counter. */ - private final AtomicInteger idCtr = new AtomicInteger(); - - /** Hadoop context. */ - @GridToStringExclude - private HadoopContext hctx; - - /** Hadoop facade for public API. */ - @GridToStringExclude - private Hadoop hadoop; - - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public HadoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.isDaemon()) - return; - - HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); - - if (cfg == null) - cfg = new HadoopConfiguration(); - else - cfg = new HadoopConfiguration(cfg); - - initializeDefaults(cfg); - - hctx = new HadoopContext( - ctx, - cfg, - new HadoopJobTracker(), - new HadoopEmbeddedTaskExecutor(), - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), - new HadoopShuffle()); - - for (HadoopComponent c : hctx.components()) - c.start(hctx); - - hadoop = new HadoopImpl(this); - - ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg)); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (hctx == null) - return; - - for (HadoopComponent c : hctx.components()) - c.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.onKernalStop(cancel); - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.stop(cancel); - } - } - - /** - * Gets Hadoop context. - * - * @return Hadoop context. - */ - public HadoopContext context() { - return hctx; - } - - /** {@inheritDoc} */ - @Override public Hadoop hadoop() { - if (hadoop == null) - throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + - "is HADOOP_HOME environment variable set?)"); - - return hadoop; - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration config() { - return hctx.configuration(); - } - - /** {@inheritDoc} */ - @Override public HadoopJobId nextJobId() { - return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { - ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); - - try { - return hctx.jobTracker().submit(jobId, jobInfo); - } - finally { - HadoopCommonUtils.restoreContextClassLoader(oldLdr); - } - } - - /** {@inheritDoc} */ - @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().status(jobId); - } - - /** {@inheritDoc} */ - @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().jobCounters(jobId); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().finishFuture(jobId); - } - - /** {@inheritDoc} */ - @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().killJob(jobId); - } - - /** {@inheritDoc} */ - @Override public void validateEnvironment() throws IgniteCheckedException { - // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here. - try { - HadoopLocations loc = HadoopClasspathUtils.locations(); - - if (!F.isEmpty(loc.home())) - U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home()); - - U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " + - loc.mapred()); - } - catch (IOException ioe) { - throw new IgniteCheckedException(ioe.getMessage(), ioe); - } - - HadoopClassLoader.hadoopUrls(); - } - - /** - * Initializes default hadoop configuration. - * - * @param cfg Hadoop configuration. - */ - private void initializeDefaults(HadoopConfiguration cfg) { - if (cfg.getMapReducePlanner() == null) - cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopProcessor.class, this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java deleted file mode 100644 index f62c999..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java +++ /dev/null @@ -1,542 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.Scanner; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; -import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; - -/** - * Setup tool to configure Hadoop client. - */ -public class HadoopSetup { - /** */ - public static final String WINUTILS_EXE = "winutils.exe"; - - /** */ - private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("ignite-") && name.endsWith(".jar"); - } - }; - - /** - * The main method. - * @param ignore Params. - */ - public static void main(String[] ignore) { - X.println( - " __________ ________________ ", - " / _/ ___/ |/ / _/_ __/ __/ ", - " _/ // (7 7 // / / / / _/ ", - "/___/\\___/_/|_/___/ /_/ /___/ ", - " for Apache Hadoop ", - " ", - "ver. " + ACK_VER_STR, - COPYRIGHT); - - configureHadoop(); - } - - /** - * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. - * It performs these operations: - *
    - *
  • Check for setting of HADOOP_HOME environment variable.
  • - *
  • Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.
  • - *
  • In Windows check if winutils.exe exists and try to fix issue with some restrictions.
  • - *
  • In Windows check new line character issues in CMD scripts.
  • - *
  • Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.
  • - *
- */ - private static void configureHadoop() { - String igniteHome = U.getIgniteHome(); - - println("IGNITE_HOME is set to '" + igniteHome + "'."); - - checkIgniteHome(igniteHome); - - String homeVar = "HADOOP_HOME"; - String hadoopHome = System.getenv(homeVar); - - if (F.isEmpty(hadoopHome)) { - homeVar = "HADOOP_PREFIX"; - hadoopHome = System.getenv(homeVar); - } - - if (F.isEmpty(hadoopHome)) - exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + - "valid Hadoop installation directory and run setup tool again.", null); - - hadoopHome = hadoopHome.replaceAll("\"", ""); - - println(homeVar + " is set to '" + hadoopHome + "'."); - - String hiveHome = System.getenv("HIVE_HOME"); - - if (!F.isEmpty(hiveHome)) { - hiveHome = hiveHome.replaceAll("\"", ""); - - println("HIVE_HOME is set to '" + hiveHome + "'."); - } - - File hadoopDir = new File(hadoopHome); - - if (!hadoopDir.exists()) - exit("Hadoop installation folder does not exist.", null); - - if (!hadoopDir.isDirectory()) - exit("HADOOP_HOME must point to a directory.", null); - - if (!hadoopDir.canRead()) - exit("Hadoop installation folder can not be read. Please check permissions.", null); - - final File hadoopCommonDir; - - String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); - - if (F.isEmpty(hadoopCommonHome)) { - hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); - - println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); - } - else { - println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); - - hadoopCommonDir = new File(hadoopCommonHome); - } - - if (!hadoopCommonDir.canRead()) - exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null); - - final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); - - if (!hadoopCommonLibDir.canRead()) - exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); - - if (U.isWindows()) { - checkJavaPathSpaces(); - - final File hadoopBinDir = new File(hadoopDir, "bin"); - - if (!hadoopBinDir.canRead()) - exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); - - File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); - - if (!winutilsFile.exists()) { - if (ask("File '" + WINUTILS_EXE + "' does not exist. " + - "It may be replaced by a stub. Create it?")) { - println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); - - boolean ok = false; - - try { - ok = winutilsFile.createNewFile(); - } - catch (IOException ignore) { - // No-op. - } - - if (!ok) - exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); - } - else - println("Ok. But Hadoop client probably will not work on Windows this way..."); - } - - processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); - } - - File igniteLibs = new File(new File(igniteHome), "libs"); - - if (!igniteLibs.exists()) - exit("Ignite 'libs' folder is not found.", null); - - Collection jarFiles = new ArrayList<>(); - - addJarsInFolder(jarFiles, igniteLibs); - addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); - addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl")); - - boolean jarsLinksCorrect = true; - - for (File file : jarFiles) { - File link = new File(hadoopCommonLibDir, file.getName()); - - jarsLinksCorrect &= isJarLinkCorrect(link, file); - - if (!jarsLinksCorrect) - break; - } - - if (!jarsLinksCorrect) { - if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + - "Create appropriate symbolic links?")) { - File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); - - if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + - "installation. They must be deleted to continue. Continue?")) { - for (File file : oldIgniteJarFiles) { - println("Deleting file '" + file.getAbsolutePath() + "'."); - - if (!file.delete()) - exit("Failed to delete file '" + file.getPath() + "'.", null); - } - } - - for (File file : jarFiles) { - File targetFile = new File(hadoopCommonLibDir, file.getName()); - - try { - println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); - - Files.createSymbolicLink(targetFile.toPath(), file.toPath()); - } - catch (IOException e) { - if (U.isWindows()) { - warn("Ability to create symbolic links is required!"); - warn("On Windows platform you have to grant permission 'Create symbolic links'"); - warn("to your user or run the Accelerator as Administrator."); - } - - exit("Creating symbolic link failed! Check permissions.", e); - } - } - } - else - println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); - } - - File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); - - File igniteHadoopCfg = igniteHadoopConfig(igniteHome); - - if (!igniteHadoopCfg.canRead()) - exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null); - - if (hadoopEtc.canWrite()) { // TODO Bigtop - if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + - "(existing files will be backed up)?")) { - replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"), - new File(hadoopEtc, "core-site.xml")); - - replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"), - new File(hadoopEtc, "mapred-site.xml")); - } - else - println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); - } - - if (!F.isEmpty(hiveHome)) { - File hiveConfDir = new File(hiveHome + File.separator + "conf"); - - if (!hiveConfDir.canWrite()) - warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + - "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); - else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) - replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"), - new File(hiveConfDir, "hive-site.xml")); - else - println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); - } - - println("Apache Hadoop setup is complete."); - } - - /** - * Get Ignite Hadoop config directory. - * - * @param igniteHome Ignite home. - * @return Ignite Hadoop config directory. - */ - private static File igniteHadoopConfig(String igniteHome) { - Path path = Paths.get(igniteHome, "modules", "hadoop", "config"); - - if (!Files.exists(path)) - path = Paths.get(igniteHome, "config", "hadoop"); - - if (Files.exists(path)) - return path.toFile(); - else - return new File(igniteHome, "docs"); - } - - /** - * @param jarFiles Jars. - * @param folder Folder. - */ - private static void addJarsInFolder(Collection jarFiles, File folder) { - if (!folder.exists()) - exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); - - jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); - } - - /** - * Checks that JAVA_HOME does not contain space characters. - */ - private static void checkJavaPathSpaces() { - String javaHome = System.getProperty("java.home"); - - if (javaHome.contains(" ")) { - warn("Java installation path contains space characters!"); - warn("Hadoop client will not be able to start using '" + javaHome + "'."); - warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); - } - } - - /** - * Checks Ignite home. - * - * @param igniteHome Ignite home. - */ - private static void checkIgniteHome(String igniteHome) { - URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); - - try { - Path jar = Paths.get(jarUrl.toURI()); - Path igHome = Paths.get(igniteHome); - - if (!jar.startsWith(igHome)) - exit("Ignite JAR files are not under IGNITE_HOME.", null); - } - catch (Exception e) { - exit(e.getMessage(), e); - } - } - - /** - * Replaces target file with source file. - * - * @param from From. - * @param to To. - */ - private static void replaceWithBackup(File from, File to) { - if (!from.canRead()) - exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); - - println("Replacing file '" + to.getAbsolutePath() + "'."); - - try { - U.copy(from, renameToBak(to), true); - } - catch (IOException e) { - exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); - } - } - - /** - * Renames file for backup. - * - * @param file File. - * @return File. - */ - private static File renameToBak(File file) { - DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); - - if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) - exit("Failed to rename file '" + file.getPath() + "'.", null); - - return file; - } - - /** - * Checks if link is correct. - * - * @param link Symbolic link. - * @param correctTarget Correct link target. - * @return {@code true} If link target is correct. - */ - private static boolean isJarLinkCorrect(File link, File correctTarget) { - if (!Files.isSymbolicLink(link.toPath())) - return false; // It is a real file or it does not exist. - - Path target = null; - - try { - target = Files.readSymbolicLink(link.toPath()); - } - catch (IOException e) { - exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); - } - - return Files.exists(target) && target.toFile().equals(correctTarget); - } - - /** - * Writes the question end read the boolean answer from the console. - * - * @param question Question to write. - * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. - */ - private static boolean ask(String question) { - X.println(); - X.print(" < " + question + " (Y/N): "); - - String answer = null; - - if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) - answer = "Y"; - else { - BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); - - try { - answer = br.readLine(); - } - catch (IOException e) { - exit("Failed to read answer: " + e.getMessage(), e); - } - } - - if (answer != null && "Y".equals(answer.toUpperCase().trim())) { - X.println(" > Yes."); - - return true; - } - else { - X.println(" > No."); - - return false; - } - } - - /** - * Exit with message. - * - * @param msg Exit message. - */ - private static void exit(String msg, Exception e) { - X.println(" "); - X.println(" # " + msg); - X.println(" # Setup failed, exiting... "); - - if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) - e.printStackTrace(); - - System.exit(1); - } - - /** - * Prints message. - * - * @param msg Message. - */ - private static void println(String msg) { - X.println(" > " + msg); - } - - /** - * Prints warning. - * - * @param msg Message. - */ - private static void warn(String msg) { - X.println(" ! " + msg); - } - - /** - * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the - * answer. If it's 'Y' then backups original files and corrects invalid new line characters. - * - * @param rootDir Root directory to process. - * @param dirs Directories inside of the root to process. - */ - private static void processCmdFiles(File rootDir, String... dirs) { - boolean answer = false; - - for (String dir : dirs) { - File subDir = new File(rootDir, dir); - - File[] cmdFiles = subDir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(".cmd"); - } - }); - - for (File file : cmdFiles) { - String content = null; - - try (Scanner scanner = new Scanner(file)) { - content = scanner.useDelimiter("\\Z").next(); - } - catch (FileNotFoundException e) { - exit("Failed to read file '" + file + "'.", e); - } - - boolean invalid = false; - - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { - invalid = true; - - break; - } - } - - if (invalid) { - answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); - - if (!answer) { - println("Ok. But Windows most probably will fail to execute them..."); - - return; - } - - println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); - - renameToBak(file); - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) - writer.write("\r"); - - writer.write(content.charAt(i)); - } - } - catch (IOException e) { - exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); - } - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java new file mode 100644 index 0000000..23eaa18 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; + +/** + * Hadoop attributes. + */ +public class HadoopAttributes implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Attribute name. */ + public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop"; + + /** Map-reduce planner class name. */ + private String plannerCls; + + /** External executor flag. */ + private boolean extExec; + + /** Maximum parallel tasks. */ + private int maxParallelTasks; + + /** Maximum task queue size. */ + private int maxTaskQueueSize; + + /** Library names. */ + @GridToStringExclude + private String[] libNames; + + /** Number of cores. */ + private int cores; + + /** + * Get attributes for node (if any). + * + * @param node Node. + * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node. + */ + @Nullable public static HadoopAttributes forNode(ClusterNode node) { + return node.attribute(NAME); + } + + /** + * {@link Externalizable} support. + */ + public HadoopAttributes() { + // No-op. + } + + /** + * Constructor. + * + * @param cfg Configuration. + */ + public HadoopAttributes(HadoopConfiguration cfg) { + assert cfg != null; + assert cfg.getMapReducePlanner() != null; + + plannerCls = cfg.getMapReducePlanner().getClass().getName(); + + // TODO: IGNITE-404: Get from configuration when fixed. + extExec = false; + + maxParallelTasks = cfg.getMaxParallelTasks(); + maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + libNames = cfg.getNativeLibraryNames(); + + // Cores count already passed in other attributes, we add it here for convenience. + cores = Runtime.getRuntime().availableProcessors(); + } + + /** + * @return Map reduce planner class name. + */ + public String plannerClassName() { + return plannerCls; + } + + /** + * @return External execution flag. + */ + public boolean externalExecution() { + return extExec; + } + + /** + * @return Maximum parallel tasks. + */ + public int maxParallelTasks() { + return maxParallelTasks; + } + + /** + * @return Maximum task queue size. + */ + public int maxTaskQueueSize() { + return maxTaskQueueSize; + } + + + /** + * @return Native library names. + */ + public String[] nativeLibraryNames() { + return libNames; + } + + /** + * @return Number of cores on machine. + */ + public int cores() { + return cores; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(plannerCls); + out.writeBoolean(extExec); + out.writeInt(maxParallelTasks); + out.writeInt(maxTaskQueueSize); + out.writeObject(libNames); + out.writeInt(cores); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + plannerCls = (String)in.readObject(); + extExec = in.readBoolean(); + maxParallelTasks = in.readInt(); + maxTaskQueueSize = in.readInt(); + libNames = (String[])in.readObject(); + cores = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java new file mode 100644 index 0000000..ed2657e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop facade implementation. + */ +public class HadoopImpl implements Hadoop { + /** Hadoop processor. */ + private final HadoopProcessor proc; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** + * Constructor. + * + * @param proc Hadoop processor. + */ + HadoopImpl(HadoopProcessor proc) { + this.proc = proc; + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration configuration() { + return proc.config(); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + if (busyLock.enterBusy()) { + try { + return proc.nextJobId(); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + if (busyLock.enterBusy()) { + try { + return proc.submit(jobId, jobInfo); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to submit job (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.status(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job status (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.counters(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job counters (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.finishFuture(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.kill(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to kill job (grid is stopping)."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java new file mode 100644 index 0000000..520f094 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.io.IOException; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Hadoop processor. + */ +public class HadoopProcessor extends HadoopProcessorAdapter { + /** Job ID counter. */ + private final AtomicInteger idCtr = new AtomicInteger(); + + /** Hadoop context. */ + @GridToStringExclude + private HadoopContext hctx; + + /** Hadoop facade for public API. */ + @GridToStringExclude + private Hadoop hadoop; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public HadoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.isDaemon()) + return; + + HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); + + if (cfg == null) + cfg = new HadoopConfiguration(); + else + cfg = new HadoopConfiguration(cfg); + + initializeDefaults(cfg); + + hctx = new HadoopContext( + ctx, + cfg, + new HadoopJobTracker(), + new HadoopEmbeddedTaskExecutor(), + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), + new HadoopShuffle()); + + for (HadoopComponent c : hctx.components()) + c.start(hctx); + + hadoop = new HadoopImpl(this); + + ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg)); + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (hctx == null) + return; + + for (HadoopComponent c : hctx.components()) + c.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.onKernalStop(cancel); + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.stop(cancel); + } + } + + /** + * Gets Hadoop context. + * + * @return Hadoop context. + */ + public HadoopContext context() { + return hctx; + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + if (hadoop == null) + throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + + "is HADOOP_HOME environment variable set?)"); + + return hadoop; + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration config() { + return hctx.configuration(); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + return hctx.jobTracker().submit(jobId, jobInfo); + } + finally { + HadoopCommonUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().status(jobId); + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().jobCounters(jobId); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().finishFuture(jobId); + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().killJob(jobId); + } + + /** {@inheritDoc} */ + @Override public void validateEnvironment() throws IgniteCheckedException { + // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here. + try { + HadoopLocations loc = HadoopClasspathUtils.locations(); + + if (!F.isEmpty(loc.home())) + U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home()); + + U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " + + loc.mapred()); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe.getMessage(), ioe); + } + + HadoopClassLoader.hadoopUrls(); + } + + /** + * Initializes default hadoop configuration. + * + * @param cfg Hadoop configuration. + */ + private void initializeDefaults(HadoopConfiguration cfg) { + if (cfg.getMapReducePlanner() == null) + cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessor.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java new file mode 100644 index 0000000..ed39ce5 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.Scanner; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; +import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; + +/** + * Setup tool to configure Hadoop client. + */ +public class HadoopSetup { + /** */ + public static final String WINUTILS_EXE = "winutils.exe"; + + /** */ + private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("ignite-") && name.endsWith(".jar"); + } + }; + + /** + * The main method. + * @param ignore Params. + */ + public static void main(String[] ignore) { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (7 7 // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " for Apache Hadoop ", + " ", + "ver. " + ACK_VER_STR, + COPYRIGHT); + + configureHadoop(); + } + + /** + * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. + * It performs these operations: + *
    + *
  • Check for setting of HADOOP_HOME environment variable.
  • + *
  • Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.
  • + *
  • In Windows check if winutils.exe exists and try to fix issue with some restrictions.
  • + *
  • In Windows check new line character issues in CMD scripts.
  • + *
  • Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.
  • + *
+ */ + private static void configureHadoop() { + String igniteHome = U.getIgniteHome(); + + println("IGNITE_HOME is set to '" + igniteHome + "'."); + + checkIgniteHome(igniteHome); + + String homeVar = "HADOOP_HOME"; + String hadoopHome = System.getenv(homeVar); + + if (F.isEmpty(hadoopHome)) { + homeVar = "HADOOP_PREFIX"; + hadoopHome = System.getenv(homeVar); + } + + if (F.isEmpty(hadoopHome)) + exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + + "valid Hadoop installation directory and run setup tool again.", null); + + hadoopHome = hadoopHome.replaceAll("\"", ""); + + println(homeVar + " is set to '" + hadoopHome + "'."); + + String hiveHome = System.getenv("HIVE_HOME"); + + if (!F.isEmpty(hiveHome)) { + hiveHome = hiveHome.replaceAll("\"", ""); + + println("HIVE_HOME is set to '" + hiveHome + "'."); + } + + File hadoopDir = new File(hadoopHome); + + if (!hadoopDir.exists()) + exit("Hadoop installation folder does not exist.", null); + + if (!hadoopDir.isDirectory()) + exit("HADOOP_HOME must point to a directory.", null); + + if (!hadoopDir.canRead()) + exit("Hadoop installation folder can not be read. Please check permissions.", null); + + final File hadoopCommonDir; + + String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); + + if (F.isEmpty(hadoopCommonHome)) { + hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); + + println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); + } + else { + println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); + + hadoopCommonDir = new File(hadoopCommonHome); + } + + if (!hadoopCommonDir.canRead()) + exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null); + + final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); + + if (!hadoopCommonLibDir.canRead()) + exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); + + if (U.isWindows()) { + checkJavaPathSpaces(); + + final File hadoopBinDir = new File(hadoopDir, "bin"); + + if (!hadoopBinDir.canRead()) + exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); + + File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); + + if (!winutilsFile.exists()) { + if (ask("File '" + WINUTILS_EXE + "' does not exist. " + + "It may be replaced by a stub. Create it?")) { + println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); + + boolean ok = false; + + try { + ok = winutilsFile.createNewFile(); + } + catch (IOException ignore) { + // No-op. + } + + if (!ok) + exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); + } + else + println("Ok. But Hadoop client probably will not work on Windows this way..."); + } + + processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); + } + + File igniteLibs = new File(new File(igniteHome), "libs"); + + if (!igniteLibs.exists()) + exit("Ignite 'libs' folder is not found.", null); + + Collection jarFiles = new ArrayList<>(); + + addJarsInFolder(jarFiles, igniteLibs); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl")); + + boolean jarsLinksCorrect = true; + + for (File file : jarFiles) { + File link = new File(hadoopCommonLibDir, file.getName()); + + jarsLinksCorrect &= isJarLinkCorrect(link, file); + + if (!jarsLinksCorrect) + break; + } + + if (!jarsLinksCorrect) { + if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + + "Create appropriate symbolic links?")) { + File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); + + if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + + "installation. They must be deleted to continue. Continue?")) { + for (File file : oldIgniteJarFiles) { + println("Deleting file '" + file.getAbsolutePath() + "'."); + + if (!file.delete()) + exit("Failed to delete file '" + file.getPath() + "'.", null); + } + } + + for (File file : jarFiles) { + File targetFile = new File(hadoopCommonLibDir, file.getName()); + + try { + println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); + + Files.createSymbolicLink(targetFile.toPath(), file.toPath()); + } + catch (IOException e) { + if (U.isWindows()) { + warn("Ability to create symbolic links is required!"); + warn("On Windows platform you have to grant permission 'Create symbolic links'"); + warn("to your user or run the Accelerator as Administrator."); + } + + exit("Creating symbolic link failed! Check permissions.", e); + } + } + } + else + println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); + } + + File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); + + File igniteHadoopCfg = igniteHadoopConfig(igniteHome); + + if (!igniteHadoopCfg.canRead()) + exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null); + + if (hadoopEtc.canWrite()) { // TODO Bigtop + if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + + "(existing files will be backed up)?")) { + replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"), + new File(hadoopEtc, "core-site.xml")); + + replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"), + new File(hadoopEtc, "mapred-site.xml")); + } + else + println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); + } + + if (!F.isEmpty(hiveHome)) { + File hiveConfDir = new File(hiveHome + File.separator + "conf"); + + if (!hiveConfDir.canWrite()) + warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + + "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); + else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) + replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"), + new File(hiveConfDir, "hive-site.xml")); + else + println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); + } + + println("Apache Hadoop setup is complete."); + } + + /** + * Get Ignite Hadoop config directory. + * + * @param igniteHome Ignite home. + * @return Ignite Hadoop config directory. + */ + private static File igniteHadoopConfig(String igniteHome) { + Path path = Paths.get(igniteHome, "modules", "hadoop", "config"); + + if (!Files.exists(path)) + path = Paths.get(igniteHome, "config", "hadoop"); + + if (Files.exists(path)) + return path.toFile(); + else + return new File(igniteHome, "docs"); + } + + /** + * @param jarFiles Jars. + * @param folder Folder. + */ + private static void addJarsInFolder(Collection jarFiles, File folder) { + if (!folder.exists()) + exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); + + jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); + } + + /** + * Checks that JAVA_HOME does not contain space characters. + */ + private static void checkJavaPathSpaces() { + String javaHome = System.getProperty("java.home"); + + if (javaHome.contains(" ")) { + warn("Java installation path contains space characters!"); + warn("Hadoop client will not be able to start using '" + javaHome + "'."); + warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); + } + } + + /** + * Checks Ignite home. + * + * @param igniteHome Ignite home. + */ + private static void checkIgniteHome(String igniteHome) { + URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); + + try { + Path jar = Paths.get(jarUrl.toURI()); + Path igHome = Paths.get(igniteHome); + + if (!jar.startsWith(igHome)) + exit("Ignite JAR files are not under IGNITE_HOME.", null); + } + catch (Exception e) { + exit(e.getMessage(), e); + } + } + + /** + * Replaces target file with source file. + * + * @param from From. + * @param to To. + */ + private static void replaceWithBackup(File from, File to) { + if (!from.canRead()) + exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); + + println("Replacing file '" + to.getAbsolutePath() + "'."); + + try { + U.copy(from, renameToBak(to), true); + } + catch (IOException e) { + exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); + } + } + + /** + * Renames file for backup. + * + * @param file File. + * @return File. + */ + private static File renameToBak(File file) { + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); + + if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) + exit("Failed to rename file '" + file.getPath() + "'.", null); + + return file; + } + + /** + * Checks if link is correct. + * + * @param link Symbolic link. + * @param correctTarget Correct link target. + * @return {@code true} If link target is correct. + */ + private static boolean isJarLinkCorrect(File link, File correctTarget) { + if (!Files.isSymbolicLink(link.toPath())) + return false; // It is a real file or it does not exist. + + Path target = null; + + try { + target = Files.readSymbolicLink(link.toPath()); + } + catch (IOException e) { + exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); + } + + return Files.exists(target) && target.toFile().equals(correctTarget); + } + + /** + * Writes the question end read the boolean answer from the console. + * + * @param question Question to write. + * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. + */ + private static boolean ask(String question) { + X.println(); + X.print(" < " + question + " (Y/N): "); + + String answer = null; + + if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) + answer = "Y"; + else { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + try { + answer = br.readLine(); + } + catch (IOException e) { + exit("Failed to read answer: " + e.getMessage(), e); + } + } + + if (answer != null && "Y".equals(answer.toUpperCase().trim())) { + X.println(" > Yes."); + + return true; + } + else { + X.println(" > No."); + + return false; + } + } + + /** + * Exit with message. + * + * @param msg Exit message. + */ + private static void exit(String msg, Exception e) { + X.println(" "); + X.println(" # " + msg); + X.println(" # Setup failed, exiting... "); + + if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) + e.printStackTrace(); + + System.exit(1); + } + + /** + * Prints message. + * + * @param msg Message. + */ + private static void println(String msg) { + X.println(" > " + msg); + } + + /** + * Prints warning. + * + * @param msg Message. + */ + private static void warn(String msg) { + X.println(" ! " + msg); + } + + /** + * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the + * answer. If it's 'Y' then backups original files and corrects invalid new line characters. + * + * @param rootDir Root directory to process. + * @param dirs Directories inside of the root to process. + */ + private static void processCmdFiles(File rootDir, String... dirs) { + boolean answer = false; + + for (String dir : dirs) { + File subDir = new File(rootDir, dir); + + File[] cmdFiles = subDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".cmd"); + } + }); + + for (File file : cmdFiles) { + String content = null; + + try (Scanner scanner = new Scanner(file)) { + content = scanner.useDelimiter("\\Z").next(); + } + catch (FileNotFoundException e) { + exit("Failed to read file '" + file + "'.", e); + } + + boolean invalid = false; + + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { + invalid = true; + + break; + } + } + + if (invalid) { + answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); + + if (!answer) { + println("Ok. But Windows most probably will fail to execute them..."); + + return; + } + + println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); + + renameToBak(file); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) + writer.write("\r"); + + writer.write(content.charAt(i)); + } + } + catch (IOException e) { + exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); + } + } + } + } + } +} \ No newline at end of file