Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2E5F917762 for ; Wed, 4 Mar 2015 15:36:06 +0000 (UTC) Received: (qmail 48804 invoked by uid 500); 4 Mar 2015 15:35:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 48774 invoked by uid 500); 4 Mar 2015 15:35:31 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 48764 invoked by uid 99); 4 Mar 2015 15:35:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 15:35:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Mar 2015 15:35:24 +0000 Received: (qmail 47250 invoked by uid 99); 4 Mar 2015 15:35:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 15:35:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9CBFE0F58; Wed, 4 Mar 2015 15:35:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 04 Mar 2015 15:35:34 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/45] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java new file mode 100644 index 0000000..e0c5916 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.hadoop.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*; + +/** + * Hadoop processor. + */ +public class HadoopProcessor extends HadoopProcessorAdapter { + /** Job ID counter. */ + private final AtomicInteger idCtr = new AtomicInteger(); + + /** Hadoop context. */ + @GridToStringExclude + private HadoopContext hctx; + + /** Hadoop facade for public API. */ + @GridToStringExclude + private Hadoop hadoop; + + /** + * @param ctx Kernal context. + */ + public HadoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.isDaemon()) + return; + + HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); + + if (cfg == null) + cfg = new HadoopConfiguration(); + else + cfg = new HadoopConfiguration(cfg); + + initializeDefaults(cfg); + + validate(cfg); + + if (hadoopHome() != null) + U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome()); + + boolean ok = false; + + try { // Check for Hadoop installation. + hadoopUrls(); + + ok = true; + } + catch (IgniteCheckedException e) { + U.quietAndWarn(log, e.getMessage()); + } + + if (ok) { + hctx = new HadoopContext( + ctx, + cfg, + new HadoopJobTracker(), + cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), + new HadoopShuffle()); + + + for (HadoopComponent c : hctx.components()) + c.start(hctx); + + hadoop = new HadoopImpl(this); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessor.class, this); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.stop(cancel); + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (hctx == null) + return; + + for (HadoopComponent c : hctx.components()) + c.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.onKernalStop(cancel); + } + } + + /** + * Gets Hadoop context. + * + * @return Hadoop context. + */ + public HadoopContext context() { + return hctx; + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + if (hadoop == null) + throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + + "is HADOOP_HOME environment variable set?)"); + + return hadoop; + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration config() { + return hctx.configuration(); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + return hctx.jobTracker().submit(jobId, jobInfo); + } + + /** {@inheritDoc} */ + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().status(jobId); + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().jobCounters(jobId); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().finishFuture(jobId); + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().killJob(jobId); + } + + /** + * Initializes default hadoop configuration. + * + * @param cfg Hadoop configuration. + */ + private void initializeDefaults(HadoopConfiguration cfg) { + if (cfg.getMapReducePlanner() == null) + cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner()); + } + + /** + * Validates Grid and Hadoop configuration for correctness. + * + * @param hadoopCfg Hadoop configuration. + * @throws IgniteCheckedException If failed. + */ + private void validate(HadoopConfiguration hadoopCfg) throws IgniteCheckedException { + if (ctx.config().isPeerClassLoadingEnabled()) + throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + + "GridConfiguration.setPeerClassLoadingEnabled())."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java new file mode 100644 index 0000000..35df5da --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; +import java.text.*; +import java.util.*; + +import static org.apache.ignite.internal.IgniteVersionUtils.*; + +/** + * Setup tool to configure Hadoop client. + */ +public class HadoopSetup { + /** */ + public static final String WINUTILS_EXE = "winutils.exe"; + + /** */ + private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("ignite-") && name.endsWith(".jar"); + } + }; + + /** + * The main method. + * @param ignore Params. + */ + public static void main(String[] ignore) { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (_ / // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " for Apache Hadoop ", + " "); + + println("Version " + ACK_VER_STR); + + configureHadoop(); + } + + /** + * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. + * It performs these operations: + *
    + *
  • Check for setting of HADOOP_HOME environment variable.
  • + *
  • Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.
  • + *
  • In Windows check if winutils.exe exists and try to fix issue with some restrictions.
  • + *
  • In Windows check new line character issues in CMD scripts.
  • + *
  • Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.
  • + *
+ */ + private static void configureHadoop() { + String igniteHome = U.getIgniteHome(); + + println("IGNITE_HOME is set to '" + igniteHome + "'."); + + checkIgniteHome(igniteHome); + + String homeVar = "HADOOP_HOME"; + String hadoopHome = System.getenv(homeVar); + + if (F.isEmpty(hadoopHome)) { + homeVar = "HADOOP_PREFIX"; + hadoopHome = System.getenv(homeVar); + } + + if (F.isEmpty(hadoopHome)) + exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + + "valid Hadoop installation directory and run setup tool again.", null); + + hadoopHome = hadoopHome.replaceAll("\"", ""); + + println(homeVar + " is set to '" + hadoopHome + "'."); + + String hiveHome = System.getenv("HIVE_HOME"); + + if (!F.isEmpty(hiveHome)) { + hiveHome = hiveHome.replaceAll("\"", ""); + + println("HIVE_HOME is set to '" + hiveHome + "'."); + } + + File hadoopDir = new File(hadoopHome); + + if (!hadoopDir.exists()) + exit("Hadoop installation folder does not exist.", null); + + if (!hadoopDir.isDirectory()) + exit("HADOOP_HOME must point to a directory.", null); + + if (!hadoopDir.canRead()) + exit("Hadoop installation folder can not be read. Please check permissions.", null); + + File hadoopCommonDir; + + String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); + + if (F.isEmpty(hadoopCommonHome)) { + hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); + + println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); + } + else { + println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); + + hadoopCommonDir = new File(hadoopCommonHome); + } + + if (!hadoopCommonDir.canRead()) + exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null); + + File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); + + if (!hadoopCommonLibDir.canRead()) + exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); + + if (U.isWindows()) { + checkJavaPathSpaces(); + + File hadoopBinDir = new File(hadoopDir, "bin"); + + if (!hadoopBinDir.canRead()) + exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); + + File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); + + if (!winutilsFile.exists()) { + if (ask("File '" + WINUTILS_EXE + "' does not exist. " + + "It may be replaced by a stub. Create it?")) { + println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); + + boolean ok = false; + + try { + ok = winutilsFile.createNewFile(); + } + catch (IOException ignore) { + // No-op. + } + + if (!ok) + exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); + } + else + println("Ok. But Hadoop client probably will not work on Windows this way..."); + } + + processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); + } + + File igniteLibs = new File(new File(igniteHome), "libs"); + + if (!igniteLibs.exists()) + exit("Ignite 'libs' folder is not found.", null); + + Collection jarFiles = new ArrayList<>(); + + addJarsInFolder(jarFiles, igniteLibs); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); + + boolean jarsLinksCorrect = true; + + for (File file : jarFiles) { + File link = new File(hadoopCommonLibDir, file.getName()); + + jarsLinksCorrect &= isJarLinkCorrect(link, file); + + if (!jarsLinksCorrect) + break; + } + + if (!jarsLinksCorrect) { + if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + + "Create appropriate symbolic links?")) { + File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); + + if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + + "installation. They must be deleted to continue. Continue?")) { + for (File file : oldIgniteJarFiles) { + println("Deleting file '" + file.getAbsolutePath() + "'."); + + if (!file.delete()) + exit("Failed to delete file '" + file.getPath() + "'.", null); + } + } + + for (File file : jarFiles) { + File targetFile = new File(hadoopCommonLibDir, file.getName()); + + try { + println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); + + Files.createSymbolicLink(targetFile.toPath(), file.toPath()); + } + catch (IOException e) { + if (U.isWindows()) { + warn("Ability to create symbolic links is required!"); + warn("On Windows platform you have to grant permission 'Create symbolic links'"); + warn("to your user or run the Accelerator as Administrator."); + } + + exit("Creating symbolic link failed! Check permissions.", e); + } + } + } + else + println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); + } + + File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); + + File igniteDocs = new File(igniteHome, "docs"); + + if (!igniteDocs.canRead()) + exit("Failed to read Ignite 'docs' folder at '" + igniteDocs.getAbsolutePath() + "'.", null); + + if (hadoopEtc.canWrite()) { // TODO Bigtop + if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + + "(existing files will be backed up)?")) { + replaceWithBackup(new File(igniteDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml")); + + replaceWithBackup(new File(igniteDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml")); + } + else + println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); + } + + if (!F.isEmpty(hiveHome)) { + File hiveConfDir = new File(hiveHome + File.separator + "conf"); + + if (!hiveConfDir.canWrite()) + warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + + "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); + else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) + replaceWithBackup(new File(igniteDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml")); + else + println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); + } + + println("Apache Hadoop setup is complete."); + } + + /** + * @param jarFiles Jars. + * @param folder Folder. + */ + private static void addJarsInFolder(Collection jarFiles, File folder) { + if (!folder.exists()) + exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); + + jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); + } + + /** + * Checks that JAVA_HOME does not contain space characters. + */ + private static void checkJavaPathSpaces() { + String javaHome = System.getProperty("java.home"); + + if (javaHome.contains(" ")) { + warn("Java installation path contains space characters!"); + warn("Hadoop client will not be able to start using '" + javaHome + "'."); + warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); + } + } + + /** + * Checks Ignite home. + * + * @param igniteHome Ignite home. + */ + private static void checkIgniteHome(String igniteHome) { + URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); + + try { + Path jar = Paths.get(jarUrl.toURI()); + Path igHome = Paths.get(igniteHome); + + if (!jar.startsWith(igHome)) + exit("Ignite JAR files are not under IGNITE_HOME.", null); + } + catch (Exception e) { + exit(e.getMessage(), e); + } + } + + /** + * Replaces target file with source file. + * + * @param from From. + * @param to To. + */ + private static void replaceWithBackup(File from, File to) { + if (!from.canRead()) + exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); + + println("Replacing file '" + to.getAbsolutePath() + "'."); + + try { + U.copy(from, renameToBak(to), true); + } + catch (IOException e) { + exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); + } + } + + /** + * Renames file for backup. + * + * @param file File. + * @return File. + */ + private static File renameToBak(File file) { + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); + + if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) + exit("Failed to rename file '" + file.getPath() + "'.", null); + + return file; + } + + /** + * Checks if link is correct. + * + * @param link Symbolic link. + * @param correctTarget Correct link target. + * @return {@code true} If link target is correct. + */ + private static boolean isJarLinkCorrect(File link, File correctTarget) { + if (!Files.isSymbolicLink(link.toPath())) + return false; // It is a real file or it does not exist. + + Path target = null; + + try { + target = Files.readSymbolicLink(link.toPath()); + } + catch (IOException e) { + exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); + } + + return Files.exists(target) && target.toFile().equals(correctTarget); + } + + /** + * Writes the question end read the boolean answer from the console. + * + * @param question Question to write. + * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. + */ + private static boolean ask(String question) { + X.println(); + X.print(" < " + question + " (Y/N): "); + + String answer = null; + + if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) + answer = "Y"; + else { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + try { + answer = br.readLine(); + } + catch (IOException e) { + exit("Failed to read answer: " + e.getMessage(), e); + } + } + + if (answer != null && "Y".equals(answer.toUpperCase().trim())) { + X.println(" > Yes."); + + return true; + } + else { + X.println(" > No."); + + return false; + } + } + + /** + * Exit with message. + * + * @param msg Exit message. + */ + private static void exit(String msg, Exception e) { + X.println(" "); + X.println(" # " + msg); + X.println(" # Setup failed, exiting... "); + + if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) + e.printStackTrace(); + + System.exit(1); + } + + /** + * Prints message. + * + * @param msg Message. + */ + private static void println(String msg) { + X.println(" > " + msg); + } + + /** + * Prints warning. + * + * @param msg Message. + */ + private static void warn(String msg) { + X.println(" ! " + msg); + } + + /** + * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the + * answer. If it's 'Y' then backups original files and corrects invalid new line characters. + * + * @param rootDir Root directory to process. + * @param dirs Directories inside of the root to process. + */ + private static void processCmdFiles(File rootDir, String... dirs) { + boolean answer = false; + + for (String dir : dirs) { + File subDir = new File(rootDir, dir); + + File[] cmdFiles = subDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".cmd"); + } + }); + + for (File file : cmdFiles) { + String content = null; + + try (Scanner scanner = new Scanner(file)) { + content = scanner.useDelimiter("\\Z").next(); + } + catch (FileNotFoundException e) { + exit("Failed to read file '" + file + "'.", e); + } + + boolean invalid = false; + + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { + invalid = true; + + break; + } + } + + if (invalid) { + answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); + + if (!answer) { + println("Ok. But Windows most probably will fail to execute them..."); + + return; + } + + println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); + + renameToBak(file); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) + writer.write("\r"); + + writer.write(content.charAt(i)); + } + } + catch (IOException e) { + exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java new file mode 100644 index 0000000..bb3d1cc --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +/** + * Exception that throws when the task is cancelling. + */ +public class HadoopTaskCancelledException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Exception message. + */ + public HadoopTaskCancelledException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java new file mode 100644 index 0000000..00be422 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop utility methods. + */ +public class HadoopUtils { + /** Property to store timestamp of new job id request. */ + public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; + + /** Property to store timestamp of response of new job id request. */ + public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; + + /** Property to store timestamp of job submission. */ + public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; + + /** Property to set custom writer of job statistics. */ + public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; + + /** Staging constant. */ + private static final String STAGING_CONSTANT = ".staging"; + + /** Old mapper class attribute. */ + private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; + + /** Old reducer class attribute. */ + private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + + /** + * Wraps native split. + * + * @param id Split ID. + * @param split Split. + * @param hosts Hosts. + * @throws IOException If failed. + */ + public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { + ByteArrayOutputStream arr = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(arr); + + assert split instanceof Writable; + + ((Writable)split).write(out); + + out.flush(); + + return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); + } + + /** + * Unwraps native split. + * + * @param o Wrapper. + * @return Split. + */ + public static Object unwrapSplit(HadoopSplitWrapper o) { + try { + Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); + + w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); + + return w; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Convert Ignite job status to Hadoop job status. + * + * @param status Ignite job status. + * @return Hadoop job status. + */ + public static JobStatus status(HadoopJobStatus status, Configuration conf) { + JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); + + float setupProgress = 0; + float mapProgress = 0; + float reduceProgress = 0; + float cleanupProgress = 0; + + JobStatus.State state = JobStatus.State.RUNNING; + + switch (status.jobPhase()) { + case PHASE_SETUP: + setupProgress = 0.42f; + + break; + + case PHASE_MAP: + setupProgress = 1; + mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); + + break; + + case PHASE_REDUCE: + assert status.totalReducerCnt() > 0; + + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + break; + + case PHASE_CANCELLING: + case PHASE_COMPLETE: + if (!status.isFailed()) { + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1; + cleanupProgress = 1; + + state = JobStatus.State.SUCCEEDED; + } + else + state = JobStatus.State.FAILED; + + break; + + default: + assert false; + } + + return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, + JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); + } + + /** + * Gets staging area directory. + * + * @param conf Configuration. + * @param usr User. + * @return Staging area directory. + */ + public static Path stagingAreaDir(Configuration conf, String usr) { + return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); + } + + /** + * Gets job file. + * + * @param conf Configuration. + * @param usr User. + * @param jobId Job ID. + * @return Job file. + */ + public static Path jobFile(Configuration conf, String usr, JobID jobId) { + return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); + } + + /** + * Checks the attribute in configuration is not set. + * + * @param attr Attribute name. + * @param msg Message for creation of exception. + * @throws IgniteCheckedException If attribute is set. + */ + public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { + if (cfg.get(attr) != null) + throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); + } + + /** + * Creates JobInfo from hadoop configuration. + * + * @param cfg Hadoop configuration. + * @return Job info. + * @throws IgniteCheckedException If failed. + */ + public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { + JobConf jobConf = new JobConf(cfg); + + boolean hasCombiner = jobConf.get("mapred.combiner.class") != null + || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; + + int numReduces = jobConf.getNumReduceTasks(); + + jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); + + if (jobConf.getUseNewMapper()) { + String mode = "new map API"; + + ensureNotSet(jobConf, "mapred.input.format.class", mode); + ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, "mapred.partitioner.class", mode); + else + ensureNotSet(jobConf, "mapred.output.format.class", mode); + } + else { + String mode = "map compatibility"; + + ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); + else + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + } + + if (numReduces != 0) { + jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); + + if (jobConf.getUseNewReducer()) { + String mode = "new reduce API"; + + ensureNotSet(jobConf, "mapred.output.format.class", mode); + ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); + } + else { + String mode = "reduce compatibility"; + + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); + } + } + + Map props = new HashMap<>(); + + for (Map.Entry entry : jobConf) + props.put(entry.getKey(), entry.getValue()); + + return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); + } + + /** + * Throws new {@link IgniteCheckedException} with original exception is serialized into string. + * This is needed to transfer error outside the current class loader. + * + * @param e Original exception. + * @return IgniteCheckedException New exception. + */ + public static IgniteCheckedException transformException(Throwable e) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + e.printStackTrace(new PrintStream(os, true)); + + return new IgniteCheckedException(os.toString()); + } + + /** + * Returns work directory for job execution. + * + * @param locNodeId Local node ID. + * @param jobId Job ID. + * @return Working directory for job. + * @throws IgniteCheckedException If Failed. + */ + public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException { + return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); + } + + /** + * Returns subdirectory of job working directory for task execution. + * + * @param locNodeId Local node ID. + * @param info Task info. + * @return Working directory for task. + * @throws IgniteCheckedException If Failed. + */ + public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException { + File jobLocDir = jobLocalDir(locNodeId, info.jobId()); + + return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); + } + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java deleted file mode 100644 index 4ef9e35..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopClassLoader.*; - -/** - * Hadoop processor. - */ -public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { - /** Job ID counter. */ - private final AtomicInteger idCtr = new AtomicInteger(); - - /** Hadoop context. */ - @GridToStringExclude - private GridHadoopContext hctx; - - /** Hadoop facade for public API. */ - @GridToStringExclude - private GridHadoop hadoop; - - /** - * @param ctx Kernal context. - */ - public IgniteHadoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.isDaemon()) - return; - - GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); - - if (cfg == null) - cfg = new GridHadoopConfiguration(); - else - cfg = new GridHadoopConfiguration(cfg); - - initializeDefaults(cfg); - - validate(cfg); - - if (hadoopHome() != null) - U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome()); - - boolean ok = false; - - try { // Check for Hadoop installation. - hadoopUrls(); - - ok = true; - } - catch (IgniteCheckedException e) { - U.quietAndWarn(log, e.getMessage()); - } - - if (ok) { - hctx = new GridHadoopContext( - ctx, - cfg, - new GridHadoopJobTracker(), - cfg.isExternalExecution() ? new GridHadoopExternalTaskExecutor() : new GridHadoopEmbeddedTaskExecutor(), - new GridHadoopShuffle()); - - - for (GridHadoopComponent c : hctx.components()) - c.start(hctx); - - hadoop = new GridHadoopImpl(this); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteHadoopProcessor.class, this); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - GridHadoopComponent c = it.previous(); - - c.stop(cancel); - } - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (hctx == null) - return; - - for (GridHadoopComponent c : hctx.components()) - c.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - GridHadoopComponent c = it.previous(); - - c.onKernalStop(cancel); - } - } - - /** - * Gets Hadoop context. - * - * @return Hadoop context. - */ - public GridHadoopContext context() { - return hctx; - } - - /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { - if (hadoop == null) - throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + - "is HADOOP_HOME environment variable set?)"); - - return hadoop; - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration config() { - return hctx.configuration(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - return hctx.jobTracker().submit(jobId, jobInfo); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().status(jobId); - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().jobCounters(jobId); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().finishFuture(jobId); - } - - /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().killJob(jobId); - } - - /** - * Initializes default hadoop configuration. - * - * @param cfg Hadoop configuration. - */ - private void initializeDefaults(GridHadoopConfiguration cfg) { - if (cfg.getMapReducePlanner() == null) - cfg.setMapReducePlanner(new GridHadoopDefaultMapReducePlanner()); - } - - /** - * Validates Grid and Hadoop configuration for correctness. - * - * @param hadoopCfg Hadoop configuration. - * @throws IgniteCheckedException If failed. - */ - private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException { - if (ctx.config().isPeerClassLoadingEnabled()) - throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + - "GridConfiguration.setPeerClassLoadingEnabled())."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java deleted file mode 100644 index 9e46846..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Default Hadoop counter implementation. - */ -public abstract class GridHadoopCounterAdapter implements GridHadoopCounter, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Counter group name. */ - private String grp; - - /** Counter name. */ - private String name; - - /** - * Default constructor required by {@link Externalizable}. - */ - protected GridHadoopCounterAdapter() { - // No-op. - } - - /** - * Creates new counter with given group and name. - * - * @param grp Counter group name. - * @param name Counter name. - */ - protected GridHadoopCounterAdapter(String grp, String name) { - assert grp != null : "counter must have group"; - assert name != null : "counter must have name"; - - this.grp = grp; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override @Nullable public String group() { - return grp; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(grp); - out.writeUTF(name); - writeValue(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - grp = in.readUTF(); - name = in.readUTF(); - readValue(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopCounterAdapter cntr = (GridHadoopCounterAdapter)o; - - if (!grp.equals(cntr.grp)) - return false; - if (!name.equals(cntr.name)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = grp.hashCode(); - res = 31 * res + name.hashCode(); - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopCounterAdapter.class, this); - } - - /** - * Writes value of this counter to output. - * - * @param out Output. - * @throws IOException If failed. - */ - protected abstract void writeValue(ObjectOutput out) throws IOException; - - /** - * Read value of this counter from input. - * - * @param in Input. - * @throws IOException If failed. - */ - protected abstract void readValue(ObjectInput in) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java deleted file mode 100644 index 92d54af..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.lang.reflect.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Default in-memory counters store. - */ -public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final ConcurrentMap cntrsMap = new ConcurrentHashMap8<>(); - - /** - * Default constructor. Creates new instance without counters. - */ - public GridHadoopCountersImpl() { - // No-op. - } - - /** - * Creates new instance that contain given counters. - * - * @param cntrs Counters to store. - */ - public GridHadoopCountersImpl(Iterable cntrs) { - addCounters(cntrs, true); - } - - /** - * Copy constructor. - * - * @param cntrs Counters to copy. - */ - public GridHadoopCountersImpl(GridHadoopCounters cntrs) { - this(cntrs.all()); - } - - /** - * Creates counter instance. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - * @return Counter. - */ - private T createCounter(Class cls, String grp, - String name) { - try { - Constructor constructor = cls.getConstructor(String.class, String.class); - - return (T)constructor.newInstance(grp, name); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** - * Adds counters collection in addition to existing counters. - * - * @param cntrs Counters to add. - * @param cp Whether to copy counters or not. - */ - private void addCounters(Iterable cntrs, boolean cp) { - assert cntrs != null; - - for (GridHadoopCounter cntr : cntrs) { - if (cp) { - GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); - - cntrCp.merge(cntr); - - cntr = cntrCp; - } - - cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); - } - } - - /** {@inheritDoc} */ - @Override public T counter(String grp, String name, Class cls) { - assert cls != null; - - CounterKey mapKey = new CounterKey(cls, grp, name); - - T cntr = (T)cntrsMap.get(mapKey); - - if (cntr == null) { - cntr = createCounter(cls, grp, name); - - T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); - - if (old != null) - return old; - } - - return cntr; - } - - /** {@inheritDoc} */ - @Override public Collection all() { - return cntrsMap.values(); - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounters other) { - for (GridHadoopCounter counter : other.all()) - counter(counter.group(), counter.name(), counter.getClass()).merge(counter); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeCollection(out, cntrsMap.values()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - addCounters(U.readCollection(in), false); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopCountersImpl counters = (GridHadoopCountersImpl)o; - - return cntrsMap.equals(counters.cntrsMap); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrsMap.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopCountersImpl.class, this, "counters", cntrsMap.values()); - } - - /** - * The tuple of counter identifier components for more readable code. - */ - private static class CounterKey extends GridTuple3, String, String> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Constructor. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - */ - private CounterKey(Class cls, String grp, String name) { - super(cls, grp, name); - } - - /** - * Empty constructor required by {@link Externalizable}. - */ - public CounterKey() { - // No-op. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java deleted file mode 100644 index 55dcc4c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -/** - * Statistic writer implementation that writes info into any Hadoop file system. - */ -public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { - /** */ - public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; - - /** */ - private static final String DEFAULT_USER_NAME = "anonymous"; - - /** */ - public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; - - /** */ - private static final String USER_MACRO = "${USER}"; - - /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; - - /** {@inheritDoc} */ - @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) - throws IgniteCheckedException { - - Configuration hadoopCfg = new Configuration(); - - for (Map.Entry e : ((GridHadoopDefaultJobInfo)jobInfo).properties().entrySet()) - hadoopCfg.set(e.getKey(), e.getValue()); - - String user = jobInfo.user(); - - if (F.isEmpty(user)) - user = DEFAULT_USER_NAME; - - String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); - - if (dir == null) - dir = DEFAULT_COUNTER_WRITER_DIR; - - Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null); - - try { - FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); - - fs.mkdirs(jobStatPath); - - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2 evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); - } - - out.flush(); - } - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java deleted file mode 100644 index 67af49f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; - -import java.io.*; - -/** - * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. - */ -public class GridHadoopLongCounter extends GridHadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The counter value. */ - private long val; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopLongCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public GridHadoopLongCounter(String grp, String name) { - super(grp, name); - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - out.writeLong(val); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - val = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { - val += ((GridHadoopLongCounter)cntr).val; - } - - /** - * Gets current value of this counter. - * - * @return Current value. - */ - public long value() { - return val; - } - - /** - * Sets current value by the given value. - * - * @param val Value to set. - */ - public void value(long val) { - this.val = val; - } - - /** - * Increment this counter by the given value. - * - * @param i Value to increase this counter by. - */ - public void increment(long i) { - val += i; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java deleted file mode 100644 index d5ceebf..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Counter for the job statistics accumulation. - */ -public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The group name for this counter. */ - private static final String GROUP_NAME = "SYSTEM"; - - /** The counter name for this counter. */ - private static final String COUNTER_NAME = "PERFORMANCE"; - - /** Events collections. */ - private Collection> evts = new ArrayList<>(); - - /** Node id to insert into the event info. */ - private UUID nodeId; - - /** */ - private int reducerNum; - - /** */ - private volatile Long firstShuffleMsg; - - /** */ - private volatile Long lastShuffleMsg; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopPerformanceCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public GridHadoopPerformanceCounter(String grp, String name) { - super(grp, name); - } - - /** - * Constructor to create instance to use this as helper. - * - * @param nodeId Id of the work node. - */ - public GridHadoopPerformanceCounter(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - U.writeCollection(out, evts); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - try { - evts = U.readCollection(in); - } - catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { - evts.addAll(((GridHadoopPerformanceCounter)cntr).evts); - } - - /** - * Gets the events collection. - * - * @return Collection of event. - */ - public Collection> evts() { - return evts; - } - - /** - * Generate name that consists of some event information. - * - * @param info Task info. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(GridHadoopTaskInfo info, String evtType) { - return eventName(info.type().toString(), info.taskNumber(), evtType); - } - - /** - * Generate name that consists of some event information. - * - * @param taskType Task type. - * @param taskNum Number of the task. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(String taskType, int taskNum, String evtType) { - assert nodeId != null; - - return taskType + " " + taskNum + " " + evtType + " " + nodeId; - } - - /** - * Adds event of the task submission (task instance creation). - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskSubmit(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "submit"), ts)); - } - - /** - * Adds event of the task preparation. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskPrepare(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "prepare"), ts)); - } - - /** - * Adds event of the task finish. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskFinish(GridHadoopTaskInfo info, long ts) { - if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) { - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); - - lastShuffleMsg = null; - } - - evts.add(new T2<>(eventName(info, "finish"), ts)); - } - - /** - * Adds event of the task run. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskStart(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "start"), ts)); - } - - /** - * Adds event of the job preparation. - * - * @param ts Timestamp of the event. - */ - public void onJobPrepare(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB prepare " + nodeId, ts)); - } - - /** - * Adds event of the job start. - * - * @param ts Timestamp of the event. - */ - public void onJobStart(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB start " + nodeId, ts)); - } - - /** - * Adds client submission events from job info. - * - * @param info Job info. - */ - public void clientSubmissionEvents(GridHadoopJobInfo info) { - assert nodeId != null; - - addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); - } - - /** - * Adds event with timestamp from some property in job info. - * - * @param evt Event type and phase. - * @param info Job info. - * @param propName Property name to get timestamp. - */ - private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) { - String val = info.property(propName); - - if (!F.isEmpty(val)) { - try { - evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); - } - catch (NumberFormatException e) { - throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); - } - } - } - - /** - * Registers shuffle message event. - * - * @param reducerNum Number of reducer that receives the data. - * @param ts Timestamp of the event. - */ - public void onShuffleMessage(int reducerNum, long ts) { - this.reducerNum = reducerNum; - - if (firstShuffleMsg == null) - firstShuffleMsg = ts; - - lastShuffleMsg = ts; - } - - /** - * Gets system predefined performance counter from the GridHadoopCounters object. - * - * @param cntrs GridHadoopCounters object. - * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. - * @return Predefined performance counter. - */ - public static GridHadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) { - GridHadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class); - - if (nodeId != null) - cntr.nodeId(nodeId); - - return cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class); - } - - /** - * Sets the nodeId field. - * - * @param nodeId Node id. - */ - private void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java new file mode 100644 index 0000000..c2ed5bb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Default Hadoop counter implementation. + */ +public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Counter group name. */ + private String grp; + + /** Counter name. */ + private String name; + + /** + * Default constructor required by {@link Externalizable}. + */ + protected HadoopCounterAdapter() { + // No-op. + } + + /** + * Creates new counter with given group and name. + * + * @param grp Counter group name. + * @param name Counter name. + */ + protected HadoopCounterAdapter(String grp, String name) { + assert grp != null : "counter must have group"; + assert name != null : "counter must have name"; + + this.grp = grp; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override @Nullable public String group() { + return grp; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(grp); + out.writeUTF(name); + writeValue(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + grp = in.readUTF(); + name = in.readUTF(); + readValue(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCounterAdapter cntr = (HadoopCounterAdapter)o; + + if (!grp.equals(cntr.grp)) + return false; + if (!name.equals(cntr.name)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = grp.hashCode(); + res = 31 * res + name.hashCode(); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCounterAdapter.class, this); + } + + /** + * Writes value of this counter to output. + * + * @param out Output. + * @throws IOException If failed. + */ + protected abstract void writeValue(ObjectOutput out) throws IOException; + + /** + * Read value of this counter from input. + * + * @param in Input. + * @throws IOException If failed. + */ + protected abstract void readValue(ObjectInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java new file mode 100644 index 0000000..78e1c26 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Default in-memory counters store. + */ +public class HadoopCountersImpl implements HadoopCounters, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final ConcurrentMap cntrsMap = new ConcurrentHashMap8<>(); + + /** + * Default constructor. Creates new instance without counters. + */ + public HadoopCountersImpl() { + // No-op. + } + + /** + * Creates new instance that contain given counters. + * + * @param cntrs Counters to store. + */ + public HadoopCountersImpl(Iterable cntrs) { + addCounters(cntrs, true); + } + + /** + * Copy constructor. + * + * @param cntrs Counters to copy. + */ + public HadoopCountersImpl(HadoopCounters cntrs) { + this(cntrs.all()); + } + + /** + * Creates counter instance. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + * @return Counter. + */ + private T createCounter(Class cls, String grp, + String name) { + try { + Constructor constructor = cls.getConstructor(String.class, String.class); + + return (T)constructor.newInstance(grp, name); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** + * Adds counters collection in addition to existing counters. + * + * @param cntrs Counters to add. + * @param cp Whether to copy counters or not. + */ + private void addCounters(Iterable cntrs, boolean cp) { + assert cntrs != null; + + for (HadoopCounter cntr : cntrs) { + if (cp) { + HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); + + cntrCp.merge(cntr); + + cntr = cntrCp; + } + + cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); + } + } + + /** {@inheritDoc} */ + @Override public T counter(String grp, String name, Class cls) { + assert cls != null; + + CounterKey mapKey = new CounterKey(cls, grp, name); + + T cntr = (T)cntrsMap.get(mapKey); + + if (cntr == null) { + cntr = createCounter(cls, grp, name); + + T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); + + if (old != null) + return old; + } + + return cntr; + } + + /** {@inheritDoc} */ + @Override public Collection all() { + return cntrsMap.values(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounters other) { + for (HadoopCounter counter : other.all()) + counter(counter.group(), counter.name(), counter.getClass()).merge(counter); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeCollection(out, cntrsMap.values()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + addCounters(U.readCollection(in), false); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCountersImpl counters = (HadoopCountersImpl)o; + + return cntrsMap.equals(counters.cntrsMap); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrsMap.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values()); + } + + /** + * The tuple of counter identifier components for more readable code. + */ + private static class CounterKey extends GridTuple3, String, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + */ + private CounterKey(Class cls, String grp, String name) { + super(cls, grp, name); + } + + /** + * Empty constructor required by {@link Externalizable}. + */ + public CounterKey() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java new file mode 100644 index 0000000..ce86edb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.io.*; + +/** + * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. + */ +public class HadoopLongCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The counter value. */ + private long val; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopLongCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopLongCounter(String grp, String name) { + super(grp, name); + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + out.writeLong(val); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + val = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void merge(HadoopCounter cntr) { + val += ((HadoopLongCounter)cntr).val; + } + + /** + * Gets current value of this counter. + * + * @return Current value. + */ + public long value() { + return val; + } + + /** + * Sets current value by the given value. + * + * @param val Value to set. + */ + public void value(long val) { + this.val = val; + } + + /** + * Increment this counter by the given value. + * + * @param i Value to increase this counter by. + */ + public void increment(long i) { + val += i; + } +}