Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E80EE175BB for ; Tue, 3 Mar 2015 13:09:16 +0000 (UTC) Received: (qmail 44352 invoked by uid 500); 3 Mar 2015 13:09:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 44318 invoked by uid 500); 3 Mar 2015 13:09:16 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 44309 invoked by uid 99); 3 Mar 2015 13:09:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:09:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 13:08:22 +0000 Received: (qmail 41377 invoked by uid 99); 3 Mar 2015 13:08:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:08:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA746E102A; Tue, 3 Mar 2015 13:08:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 03 Mar 2015 13:08:31 -0000 Message-Id: <9b53eb3747a547c195ae92c571c9ca50@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (2). X-Virus-Checked: Checked by ClamAV on apache.org # IGNITE-386: WIP on internal namings (2). Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/288709a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/288709a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/288709a1 Branch: refs/heads/ignite-386 Commit: 288709a1b48260e665278037e1beb050ab8ecdb4 Parents: ace354c Author: vozerov-gridgain Authored: Tue Mar 3 15:55:58 2015 +0300 Committer: vozerov-gridgain Committed: Tue Mar 3 15:55:58 2015 +0300 ---------------------------------------------------------------------- docs/core-site.ignite.xml | 2 +- .../ignite/internal/IgniteComponentType.java | 2 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 92 ++ .../processors/hadoop/HadoopContext.java | 2 +- .../processors/hadoop/HadoopCounters.java | 14 +- .../internal/processors/hadoop/HadoopImpl.java | 4 +- .../processors/hadoop/HadoopProcessor.java | 225 ++++ .../hadoop/IgniteHadoopProcessor.java | 225 ---- .../counter/GridHadoopCounterAdapter.java | 128 --- .../hadoop/counter/GridHadoopCountersImpl.java | 198 ---- .../counter/GridHadoopFSCounterWriter.java | 91 -- .../hadoop/counter/GridHadoopLongCounter.java | 92 -- .../counter/GridHadoopPerformanceCounter.java | 279 ----- .../hadoop/counter/HadoopCounterAdapter.java | 128 +++ .../hadoop/counter/HadoopCountersImpl.java | 198 ++++ .../hadoop/counter/HadoopLongCounter.java | 92 ++ .../counter/HadoopPerformanceCounter.java | 279 +++++ .../fs/GridHadoopDistributedFileSystem.java | 91 -- .../hadoop/fs/GridHadoopFileSystemsUtils.java | 57 - .../hadoop/fs/GridHadoopLocalFileSystemV1.java | 39 - .../hadoop/fs/GridHadoopLocalFileSystemV2.java | 86 -- .../hadoop/fs/GridHadoopRawLocalFileSystem.java | 304 ------ .../hadoop/fs/HadoopDistributedFileSystem.java | 91 ++ .../hadoop/fs/HadoopFileSystemsUtils.java | 57 + .../hadoop/fs/HadoopLocalFileSystemV1.java | 39 + .../hadoop/fs/HadoopLocalFileSystemV2.java | 86 ++ .../hadoop/fs/HadoopRawLocalFileSystem.java | 304 ++++++ .../jobtracker/GridHadoopJobMetadata.java | 305 ------ .../hadoop/jobtracker/HadoopJobMetadata.java | 305 ++++++ .../hadoop/jobtracker/HadoopJobTracker.java | 104 +- .../hadoop/message/GridHadoopMessage.java | 27 - .../hadoop/message/HadoopMessage.java | 27 + .../planner/GridHadoopDefaultMapReducePlan.java | 107 -- .../GridHadoopDefaultMapReducePlanner.java | 434 -------- .../planner/HadoopDefaultMapReducePlan.java | 107 ++ .../planner/HadoopDefaultMapReducePlanner.java | 434 ++++++++ .../GridHadoopProtocolJobCountersTask.java | 45 - .../proto/GridHadoopProtocolJobStatusTask.java | 81 -- .../proto/GridHadoopProtocolKillJobTask.java | 46 - .../proto/GridHadoopProtocolNextTaskIdTask.java | 35 - .../proto/GridHadoopProtocolSubmitJobTask.java | 57 - .../proto/GridHadoopProtocolTaskAdapter.java | 113 -- .../proto/GridHadoopProtocolTaskArguments.java | 81 -- .../hadoop/proto/HadoopClientProtocol.java | 22 +- .../proto/HadoopProtocolJobCountersTask.java | 45 + .../proto/HadoopProtocolJobStatusTask.java | 81 ++ .../hadoop/proto/HadoopProtocolKillJobTask.java | 46 + .../proto/HadoopProtocolNextTaskIdTask.java | 35 + .../proto/HadoopProtocolSubmitJobTask.java | 57 + .../hadoop/proto/HadoopProtocolTaskAdapter.java | 113 ++ .../proto/HadoopProtocolTaskArguments.java | 81 ++ .../hadoop/shuffle/GridHadoopShuffleAck.java | 91 -- .../hadoop/shuffle/GridHadoopShuffleJob.java | 593 ----------- .../shuffle/GridHadoopShuffleMessage.java | 242 ----- .../hadoop/shuffle/HadoopShuffle.java | 38 +- .../hadoop/shuffle/HadoopShuffleAck.java | 91 ++ .../hadoop/shuffle/HadoopShuffleJob.java | 593 +++++++++++ .../hadoop/shuffle/HadoopShuffleMessage.java | 241 +++++ .../GridHadoopConcurrentHashMultimap.java | 611 ----------- .../collections/GridHadoopHashMultimap.java | 174 --- .../collections/GridHadoopHashMultimapBase.java | 208 ---- .../shuffle/collections/GridHadoopMultimap.java | 112 -- .../collections/GridHadoopMultimapBase.java | 368 ------- .../shuffle/collections/GridHadoopSkipList.java | 726 ------------- .../HadoopConcurrentHashMultimap.java | 611 +++++++++++ .../shuffle/collections/HadoopHashMultimap.java | 174 +++ .../collections/HadoopHashMultimapBase.java | 208 ++++ .../shuffle/collections/HadoopMultimap.java | 112 ++ .../shuffle/collections/HadoopMultimapBase.java | 368 +++++++ .../shuffle/collections/HadoopSkipList.java | 726 +++++++++++++ .../shuffle/streams/GridHadoopDataInStream.java | 170 --- .../streams/GridHadoopDataOutStream.java | 131 --- .../streams/GridHadoopOffheapBuffer.java | 122 --- .../shuffle/streams/HadoopDataInStream.java | 170 +++ .../shuffle/streams/HadoopDataOutStream.java | 131 +++ .../shuffle/streams/HadoopOffheapBuffer.java | 122 +++ .../taskexecutor/GridHadoopExecutorService.java | 232 ---- .../taskexecutor/GridHadoopRunnableTask.java | 22 +- .../taskexecutor/GridHadoopTaskState.java | 38 - .../taskexecutor/GridHadoopTaskStatus.java | 114 -- .../HadoopEmbeddedTaskExecutor.java | 8 +- .../taskexecutor/HadoopExecutorService.java | 231 ++++ .../taskexecutor/HadoopTaskExecutorAdapter.java | 2 +- .../hadoop/taskexecutor/HadoopTaskState.java | 38 + .../hadoop/taskexecutor/HadoopTaskStatus.java | 114 ++ .../GridHadoopExternalTaskMetadata.java | 68 -- .../GridHadoopJobInfoUpdateRequest.java | 109 -- .../GridHadoopPrepareForJobRequest.java | 126 --- .../external/GridHadoopProcessDescriptor.java | 150 --- .../external/GridHadoopProcessStartedAck.java | 46 - .../GridHadoopTaskExecutionRequest.java | 110 -- .../external/GridHadoopTaskFinishedMessage.java | 92 -- .../external/HadoopExternalTaskExecutor.java | 78 +- .../external/HadoopExternalTaskMetadata.java | 68 ++ .../external/HadoopJobInfoUpdateRequest.java | 109 ++ .../external/HadoopPrepareForJobRequest.java | 126 +++ .../external/HadoopProcessDescriptor.java | 150 +++ .../external/HadoopProcessStartedAck.java | 46 + .../external/HadoopTaskExecutionRequest.java | 110 ++ .../external/HadoopTaskFinishedMessage.java | 92 ++ .../child/GridHadoopChildProcessRunner.java | 72 +- .../child/GridHadoopExternalProcessStarter.java | 2 +- .../GridHadoopCommunicationClient.java | 2 +- .../GridHadoopExternalCommunication.java | 52 +- .../GridHadoopMarshallerFilter.java | 2 +- .../GridHadoopMessageListener.java | 4 +- .../GridHadoopTcpNioCommunicationClient.java | 2 +- .../hadoop/v1/GridHadoopV1Counter.java | 4 +- .../hadoop/v1/GridHadoopV1Reporter.java | 2 +- .../hadoop/v2/GridHadoopV2Context.java | 2 +- .../hadoop/v2/GridHadoopV2Counter.java | 4 +- .../processors/hadoop/v2/GridHadoopV2Job.java | 2 +- .../v2/GridHadoopV2JobResourceManager.java | 2 +- .../hadoop/v2/GridHadoopV2TaskContext.java | 4 +- .../hadoop/GridHadoopAbstractSelfTest.java | 2 +- .../hadoop/GridHadoopCommandLineTest.java | 4 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 1005 ------------------ .../hadoop/GridHadoopFileSystemsTest.java | 6 +- .../hadoop/GridHadoopMapReduceTest.java | 7 +- .../GridHadoopTestRoundRobinMrPlanner.java | 2 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 1005 ++++++++++++++++++ ...ridHadoopConcurrentHashMultimapSelftest.java | 12 +- .../collections/GridHadoopHashMapSelfTest.java | 6 +- .../collections/GridHadoopSkipListSelfTest.java | 14 +- .../streams/GridHadoopDataStreamSelfTest.java | 4 +- .../GridHadoopExecutorServiceTest.java | 119 --- .../taskexecutor/HadoopExecutorServiceTest.java | 119 +++ ...GridHadoopExternalCommunicationSelfTest.java | 6 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- 129 files changed, 8937 insertions(+), 8937 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/docs/core-site.ignite.xml ---------------------------------------------------------------------- diff --git a/docs/core-site.ignite.xml b/docs/core-site.ignite.xml index 1146576..8b8e634 100644 --- a/docs/core-site.ignite.xml +++ b/docs/core-site.ignite.xml @@ -73,7 +73,7 @@ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index d0e487a..a51800e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -36,7 +36,7 @@ public enum IgniteComponentType { /** Hadoop. */ HADOOP( "org.apache.ignite.internal.processors.hadoop.IgniteHadoopNoopProcessor", - "org.apache.ignite.internal.processors.hadoop.IgniteHadoopProcessor", + "org.apache.ignite.internal.processors.hadoop.HadoopProcessor", "ignite-hadoop" ), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java new file mode 100644 index 0000000..449cff2 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +/** + * Statistic writer implementation that writes info into any Hadoop file system. + */ +public class IgniteHadoopFileSystemCounterWriter implements GridHadoopCounterWriter { + /** */ + public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; + + /** */ + private static final String DEFAULT_USER_NAME = "anonymous"; + + /** */ + public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; + + /** */ + private static final String USER_MACRO = "${USER}"; + + /** */ + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; + + /** {@inheritDoc} */ + @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) + throws IgniteCheckedException { + + Configuration hadoopCfg = new Configuration(); + + for (Map.Entry e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) + hadoopCfg.set(e.getKey(), e.getValue()); + + String user = jobInfo.user(); + + if (F.isEmpty(user)) + user = DEFAULT_USER_NAME; + + String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); + + if (dir == null) + dir = DEFAULT_COUNTER_WRITER_DIR; + + Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + try { + FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); + + fs.mkdirs(jobStatPath); + + try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2 evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } + + out.flush(); + } + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java index bb707c8..d897b6c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -143,7 +143,7 @@ public class HadoopContext { * @param meta Job metadata. * @return {@code true} If local node is participating in job execution. */ - public boolean isParticipating(GridHadoopJobMetadata meta) { + public boolean isParticipating(HadoopJobMetadata meta) { UUID locNodeId = localNodeId(); if (locNodeId.equals(meta.submitNodeId())) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java index c7f0157..ad699ec 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java @@ -31,7 +31,7 @@ import java.util.*; */ public class HadoopCounters extends Counters { /** */ - private final Map,GridHadoopLongCounter> cntrs = new HashMap<>(); + private final Map,HadoopLongCounter> cntrs = new HashMap<>(); /** * Creates new instance based on given counters. @@ -40,8 +40,8 @@ public class HadoopCounters extends Counters { */ public HadoopCounters(GridHadoopCounters cntrs) { for (GridHadoopCounter cntr : cntrs.all()) - if (cntr instanceof GridHadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr); + if (cntr instanceof HadoopLongCounter) + this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); } /** {@inheritDoc} */ @@ -184,7 +184,7 @@ public class HadoopCounters extends Counters { public Iterator iterateGroup(String grpName) { Collection grpCounters = new ArrayList<>(); - for (GridHadoopLongCounter counter : cntrs.values()) { + for (HadoopLongCounter counter : cntrs.values()) { if (grpName.equals(counter.group())) grpCounters.add(new GridHadoopV2Counter(counter)); } @@ -203,12 +203,12 @@ public class HadoopCounters extends Counters { public Counter findCounter(String grpName, String cntrName, boolean create) { T2 key = new T2<>(grpName, cntrName); - GridHadoopLongCounter internalCntr = cntrs.get(key); + HadoopLongCounter internalCntr = cntrs.get(key); if (internalCntr == null & create) { - internalCntr = new GridHadoopLongCounter(grpName,cntrName); + internalCntr = new HadoopLongCounter(grpName,cntrName); - cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName)); + cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); } return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java index 80fd995..b87e7f8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java @@ -27,7 +27,7 @@ import org.jetbrains.annotations.*; */ public class HadoopImpl implements GridHadoop { /** Hadoop processor. */ - private final IgniteHadoopProcessor proc; + private final HadoopProcessor proc; /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); @@ -37,7 +37,7 @@ public class HadoopImpl implements GridHadoop { * * @param proc Hadoop processor. */ - HadoopImpl(IgniteHadoopProcessor proc) { + HadoopImpl(HadoopProcessor proc) { this.proc = proc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java new file mode 100644 index 0000000..1f50b0c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.hadoop.planner.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*; + +/** + * Hadoop processor. + */ +public class HadoopProcessor extends IgniteHadoopProcessorAdapter { + /** Job ID counter. */ + private final AtomicInteger idCtr = new AtomicInteger(); + + /** Hadoop context. */ + @GridToStringExclude + private HadoopContext hctx; + + /** Hadoop facade for public API. */ + @GridToStringExclude + private GridHadoop hadoop; + + /** + * @param ctx Kernal context. + */ + public HadoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.isDaemon()) + return; + + GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); + + if (cfg == null) + cfg = new GridHadoopConfiguration(); + else + cfg = new GridHadoopConfiguration(cfg); + + initializeDefaults(cfg); + + validate(cfg); + + if (hadoopHome() != null) + U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome()); + + boolean ok = false; + + try { // Check for Hadoop installation. + hadoopUrls(); + + ok = true; + } + catch (IgniteCheckedException e) { + U.quietAndWarn(log, e.getMessage()); + } + + if (ok) { + hctx = new HadoopContext( + ctx, + cfg, + new HadoopJobTracker(), + cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), + new HadoopShuffle()); + + + for (HadoopComponent c : hctx.components()) + c.start(hctx); + + hadoop = new HadoopImpl(this); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessor.class, this); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.stop(cancel); + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + if (hctx == null) + return; + + for (HadoopComponent c : hctx.components()) + c.onKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (hctx == null) + return; + + List components = hctx.components(); + + for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); + + c.onKernalStop(cancel); + } + } + + /** + * Gets Hadoop context. + * + * @return Hadoop context. + */ + public HadoopContext context() { + return hctx; + } + + /** {@inheritDoc} */ + @Override public GridHadoop hadoop() { + if (hadoop == null) + throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + + "is HADOOP_HOME environment variable set?)"); + + return hadoop; + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration config() { + return hctx.configuration(); + } + + /** {@inheritDoc} */ + @Override public GridHadoopJobId nextJobId() { + return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + return hctx.jobTracker().submit(jobId, jobInfo); + } + + /** {@inheritDoc} */ + @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().status(jobId); + } + + /** {@inheritDoc} */ + @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().jobCounters(jobId); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().finishFuture(jobId); + } + + /** {@inheritDoc} */ + @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { + return hctx.jobTracker().killJob(jobId); + } + + /** + * Initializes default hadoop configuration. + * + * @param cfg Hadoop configuration. + */ + private void initializeDefaults(GridHadoopConfiguration cfg) { + if (cfg.getMapReducePlanner() == null) + cfg.setMapReducePlanner(new HadoopDefaultMapReducePlanner()); + } + + /** + * Validates Grid and Hadoop configuration for correctness. + * + * @param hadoopCfg Hadoop configuration. + * @throws IgniteCheckedException If failed. + */ + private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException { + if (ctx.config().isPeerClassLoadingEnabled()) + throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + + "GridConfiguration.setPeerClassLoadingEnabled())."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java deleted file mode 100644 index 63e4854..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*; - -/** - * Hadoop processor. - */ -public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { - /** Job ID counter. */ - private final AtomicInteger idCtr = new AtomicInteger(); - - /** Hadoop context. */ - @GridToStringExclude - private HadoopContext hctx; - - /** Hadoop facade for public API. */ - @GridToStringExclude - private GridHadoop hadoop; - - /** - * @param ctx Kernal context. - */ - public IgniteHadoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.isDaemon()) - return; - - GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); - - if (cfg == null) - cfg = new GridHadoopConfiguration(); - else - cfg = new GridHadoopConfiguration(cfg); - - initializeDefaults(cfg); - - validate(cfg); - - if (hadoopHome() != null) - U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome()); - - boolean ok = false; - - try { // Check for Hadoop installation. - hadoopUrls(); - - ok = true; - } - catch (IgniteCheckedException e) { - U.quietAndWarn(log, e.getMessage()); - } - - if (ok) { - hctx = new HadoopContext( - ctx, - cfg, - new HadoopJobTracker(), - cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), - new HadoopShuffle()); - - - for (HadoopComponent c : hctx.components()) - c.start(hctx); - - hadoop = new HadoopImpl(this); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteHadoopProcessor.class, this); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.stop(cancel); - } - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (hctx == null) - return; - - for (HadoopComponent c : hctx.components()) - c.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (hctx == null) - return; - - List components = hctx.components(); - - for (ListIterator it = components.listIterator(components.size()); it.hasPrevious();) { - HadoopComponent c = it.previous(); - - c.onKernalStop(cancel); - } - } - - /** - * Gets Hadoop context. - * - * @return Hadoop context. - */ - public HadoopContext context() { - return hctx; - } - - /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { - if (hadoop == null) - throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + - "is HADOOP_HOME environment variable set?)"); - - return hadoop; - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration config() { - return hctx.configuration(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - return hctx.jobTracker().submit(jobId, jobInfo); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().status(jobId); - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().jobCounters(jobId); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().finishFuture(jobId); - } - - /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { - return hctx.jobTracker().killJob(jobId); - } - - /** - * Initializes default hadoop configuration. - * - * @param cfg Hadoop configuration. - */ - private void initializeDefaults(GridHadoopConfiguration cfg) { - if (cfg.getMapReducePlanner() == null) - cfg.setMapReducePlanner(new GridHadoopDefaultMapReducePlanner()); - } - - /** - * Validates Grid and Hadoop configuration for correctness. - * - * @param hadoopCfg Hadoop configuration. - * @throws IgniteCheckedException If failed. - */ - private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException { - if (ctx.config().isPeerClassLoadingEnabled()) - throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + - "GridConfiguration.setPeerClassLoadingEnabled())."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java deleted file mode 100644 index 9e46846..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Default Hadoop counter implementation. - */ -public abstract class GridHadoopCounterAdapter implements GridHadoopCounter, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Counter group name. */ - private String grp; - - /** Counter name. */ - private String name; - - /** - * Default constructor required by {@link Externalizable}. - */ - protected GridHadoopCounterAdapter() { - // No-op. - } - - /** - * Creates new counter with given group and name. - * - * @param grp Counter group name. - * @param name Counter name. - */ - protected GridHadoopCounterAdapter(String grp, String name) { - assert grp != null : "counter must have group"; - assert name != null : "counter must have name"; - - this.grp = grp; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override @Nullable public String group() { - return grp; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(grp); - out.writeUTF(name); - writeValue(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - grp = in.readUTF(); - name = in.readUTF(); - readValue(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopCounterAdapter cntr = (GridHadoopCounterAdapter)o; - - if (!grp.equals(cntr.grp)) - return false; - if (!name.equals(cntr.name)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = grp.hashCode(); - res = 31 * res + name.hashCode(); - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopCounterAdapter.class, this); - } - - /** - * Writes value of this counter to output. - * - * @param out Output. - * @throws IOException If failed. - */ - protected abstract void writeValue(ObjectOutput out) throws IOException; - - /** - * Read value of this counter from input. - * - * @param in Input. - * @throws IOException If failed. - */ - protected abstract void readValue(ObjectInput in) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java deleted file mode 100644 index 92d54af..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.lang.reflect.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Default in-memory counters store. - */ -public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final ConcurrentMap cntrsMap = new ConcurrentHashMap8<>(); - - /** - * Default constructor. Creates new instance without counters. - */ - public GridHadoopCountersImpl() { - // No-op. - } - - /** - * Creates new instance that contain given counters. - * - * @param cntrs Counters to store. - */ - public GridHadoopCountersImpl(Iterable cntrs) { - addCounters(cntrs, true); - } - - /** - * Copy constructor. - * - * @param cntrs Counters to copy. - */ - public GridHadoopCountersImpl(GridHadoopCounters cntrs) { - this(cntrs.all()); - } - - /** - * Creates counter instance. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - * @return Counter. - */ - private T createCounter(Class cls, String grp, - String name) { - try { - Constructor constructor = cls.getConstructor(String.class, String.class); - - return (T)constructor.newInstance(grp, name); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - /** - * Adds counters collection in addition to existing counters. - * - * @param cntrs Counters to add. - * @param cp Whether to copy counters or not. - */ - private void addCounters(Iterable cntrs, boolean cp) { - assert cntrs != null; - - for (GridHadoopCounter cntr : cntrs) { - if (cp) { - GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); - - cntrCp.merge(cntr); - - cntr = cntrCp; - } - - cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); - } - } - - /** {@inheritDoc} */ - @Override public T counter(String grp, String name, Class cls) { - assert cls != null; - - CounterKey mapKey = new CounterKey(cls, grp, name); - - T cntr = (T)cntrsMap.get(mapKey); - - if (cntr == null) { - cntr = createCounter(cls, grp, name); - - T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); - - if (old != null) - return old; - } - - return cntr; - } - - /** {@inheritDoc} */ - @Override public Collection all() { - return cntrsMap.values(); - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounters other) { - for (GridHadoopCounter counter : other.all()) - counter(counter.group(), counter.name(), counter.getClass()).merge(counter); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeCollection(out, cntrsMap.values()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - addCounters(U.readCollection(in), false); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopCountersImpl counters = (GridHadoopCountersImpl)o; - - return cntrsMap.equals(counters.cntrsMap); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrsMap.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopCountersImpl.class, this, "counters", cntrsMap.values()); - } - - /** - * The tuple of counter identifier components for more readable code. - */ - private static class CounterKey extends GridTuple3, String, String> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Constructor. - * - * @param cls Class of the counter. - * @param grp Group name. - * @param name Counter name. - */ - private CounterKey(Class cls, String grp, String name) { - super(cls, grp, name); - } - - /** - * Empty constructor required by {@link Externalizable}. - */ - public CounterKey() { - // No-op. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java deleted file mode 100644 index d603d76..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -/** - * Statistic writer implementation that writes info into any Hadoop file system. - */ -public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { - /** */ - public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; - - /** */ - private static final String DEFAULT_USER_NAME = "anonymous"; - - /** */ - public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; - - /** */ - private static final String USER_MACRO = "${USER}"; - - /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; - - /** {@inheritDoc} */ - @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) - throws IgniteCheckedException { - - Configuration hadoopCfg = new Configuration(); - - for (Map.Entry e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) - hadoopCfg.set(e.getKey(), e.getValue()); - - String user = jobInfo.user(); - - if (F.isEmpty(user)) - user = DEFAULT_USER_NAME; - - String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); - - if (dir == null) - dir = DEFAULT_COUNTER_WRITER_DIR; - - Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null); - - try { - FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); - - fs.mkdirs(jobStatPath); - - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2 evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); - } - - out.flush(); - } - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java deleted file mode 100644 index 67af49f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; - -import java.io.*; - -/** - * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. - */ -public class GridHadoopLongCounter extends GridHadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The counter value. */ - private long val; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopLongCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public GridHadoopLongCounter(String grp, String name) { - super(grp, name); - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - out.writeLong(val); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - val = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { - val += ((GridHadoopLongCounter)cntr).val; - } - - /** - * Gets current value of this counter. - * - * @return Current value. - */ - public long value() { - return val; - } - - /** - * Sets current value by the given value. - * - * @param val Value to set. - */ - public void value(long val) { - this.val = val; - } - - /** - * Increment this counter by the given value. - * - * @param i Value to increase this counter by. - */ - public void increment(long i) { - val += i; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java deleted file mode 100644 index 263a075..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.counter; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Counter for the job statistics accumulation. - */ -public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** The group name for this counter. */ - private static final String GROUP_NAME = "SYSTEM"; - - /** The counter name for this counter. */ - private static final String COUNTER_NAME = "PERFORMANCE"; - - /** Events collections. */ - private Collection> evts = new ArrayList<>(); - - /** Node id to insert into the event info. */ - private UUID nodeId; - - /** */ - private int reducerNum; - - /** */ - private volatile Long firstShuffleMsg; - - /** */ - private volatile Long lastShuffleMsg; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopPerformanceCounter() { - // No-op. - } - - /** - * Constructor. - * - * @param grp Group name. - * @param name Counter name. - */ - public GridHadoopPerformanceCounter(String grp, String name) { - super(grp, name); - } - - /** - * Constructor to create instance to use this as helper. - * - * @param nodeId Id of the work node. - */ - public GridHadoopPerformanceCounter(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override protected void writeValue(ObjectOutput out) throws IOException { - U.writeCollection(out, evts); - } - - /** {@inheritDoc} */ - @Override protected void readValue(ObjectInput in) throws IOException { - try { - evts = U.readCollection(in); - } - catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { - evts.addAll(((GridHadoopPerformanceCounter)cntr).evts); - } - - /** - * Gets the events collection. - * - * @return Collection of event. - */ - public Collection> evts() { - return evts; - } - - /** - * Generate name that consists of some event information. - * - * @param info Task info. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(GridHadoopTaskInfo info, String evtType) { - return eventName(info.type().toString(), info.taskNumber(), evtType); - } - - /** - * Generate name that consists of some event information. - * - * @param taskType Task type. - * @param taskNum Number of the task. - * @param evtType The type of the event. - * @return String contains necessary event information. - */ - private String eventName(String taskType, int taskNum, String evtType) { - assert nodeId != null; - - return taskType + " " + taskNum + " " + evtType + " " + nodeId; - } - - /** - * Adds event of the task submission (task instance creation). - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskSubmit(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "submit"), ts)); - } - - /** - * Adds event of the task preparation. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskPrepare(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "prepare"), ts)); - } - - /** - * Adds event of the task finish. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskFinish(GridHadoopTaskInfo info, long ts) { - if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) { - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); - evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); - - lastShuffleMsg = null; - } - - evts.add(new T2<>(eventName(info, "finish"), ts)); - } - - /** - * Adds event of the task run. - * - * @param info Task info. - * @param ts Timestamp of the event. - */ - public void onTaskStart(GridHadoopTaskInfo info, long ts) { - evts.add(new T2<>(eventName(info, "start"), ts)); - } - - /** - * Adds event of the job preparation. - * - * @param ts Timestamp of the event. - */ - public void onJobPrepare(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB prepare " + nodeId, ts)); - } - - /** - * Adds event of the job start. - * - * @param ts Timestamp of the event. - */ - public void onJobStart(long ts) { - assert nodeId != null; - - evts.add(new T2<>("JOB start " + nodeId, ts)); - } - - /** - * Adds client submission events from job info. - * - * @param info Job info. - */ - public void clientSubmissionEvents(GridHadoopJobInfo info) { - assert nodeId != null; - - addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); - addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); - } - - /** - * Adds event with timestamp from some property in job info. - * - * @param evt Event type and phase. - * @param info Job info. - * @param propName Property name to get timestamp. - */ - private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) { - String val = info.property(propName); - - if (!F.isEmpty(val)) { - try { - evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); - } - catch (NumberFormatException e) { - throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); - } - } - } - - /** - * Registers shuffle message event. - * - * @param reducerNum Number of reducer that receives the data. - * @param ts Timestamp of the event. - */ - public void onShuffleMessage(int reducerNum, long ts) { - this.reducerNum = reducerNum; - - if (firstShuffleMsg == null) - firstShuffleMsg = ts; - - lastShuffleMsg = ts; - } - - /** - * Gets system predefined performance counter from the GridHadoopCounters object. - * - * @param cntrs GridHadoopCounters object. - * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. - * @return Predefined performance counter. - */ - public static GridHadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) { - GridHadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class); - - if (nodeId != null) - cntr.nodeId(nodeId); - - return cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class); - } - - /** - * Sets the nodeId field. - * - * @param nodeId Node id. - */ - private void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java new file mode 100644 index 0000000..3fdce14 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Default Hadoop counter implementation. + */ +public abstract class HadoopCounterAdapter implements GridHadoopCounter, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Counter group name. */ + private String grp; + + /** Counter name. */ + private String name; + + /** + * Default constructor required by {@link Externalizable}. + */ + protected HadoopCounterAdapter() { + // No-op. + } + + /** + * Creates new counter with given group and name. + * + * @param grp Counter group name. + * @param name Counter name. + */ + protected HadoopCounterAdapter(String grp, String name) { + assert grp != null : "counter must have group"; + assert name != null : "counter must have name"; + + this.grp = grp; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override @Nullable public String group() { + return grp; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(grp); + out.writeUTF(name); + writeValue(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + grp = in.readUTF(); + name = in.readUTF(); + readValue(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCounterAdapter cntr = (HadoopCounterAdapter)o; + + if (!grp.equals(cntr.grp)) + return false; + if (!name.equals(cntr.name)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = grp.hashCode(); + res = 31 * res + name.hashCode(); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCounterAdapter.class, this); + } + + /** + * Writes value of this counter to output. + * + * @param out Output. + * @throws IOException If failed. + */ + protected abstract void writeValue(ObjectOutput out) throws IOException; + + /** + * Read value of this counter from input. + * + * @param in Input. + * @throws IOException If failed. + */ + protected abstract void readValue(ObjectInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java new file mode 100644 index 0000000..01b1473 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Default in-memory counters store. + */ +public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final ConcurrentMap cntrsMap = new ConcurrentHashMap8<>(); + + /** + * Default constructor. Creates new instance without counters. + */ + public HadoopCountersImpl() { + // No-op. + } + + /** + * Creates new instance that contain given counters. + * + * @param cntrs Counters to store. + */ + public HadoopCountersImpl(Iterable cntrs) { + addCounters(cntrs, true); + } + + /** + * Copy constructor. + * + * @param cntrs Counters to copy. + */ + public HadoopCountersImpl(GridHadoopCounters cntrs) { + this(cntrs.all()); + } + + /** + * Creates counter instance. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + * @return Counter. + */ + private T createCounter(Class cls, String grp, + String name) { + try { + Constructor constructor = cls.getConstructor(String.class, String.class); + + return (T)constructor.newInstance(grp, name); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** + * Adds counters collection in addition to existing counters. + * + * @param cntrs Counters to add. + * @param cp Whether to copy counters or not. + */ + private void addCounters(Iterable cntrs, boolean cp) { + assert cntrs != null; + + for (GridHadoopCounter cntr : cntrs) { + if (cp) { + GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); + + cntrCp.merge(cntr); + + cntr = cntrCp; + } + + cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr); + } + } + + /** {@inheritDoc} */ + @Override public T counter(String grp, String name, Class cls) { + assert cls != null; + + CounterKey mapKey = new CounterKey(cls, grp, name); + + T cntr = (T)cntrsMap.get(mapKey); + + if (cntr == null) { + cntr = createCounter(cls, grp, name); + + T old = (T)cntrsMap.putIfAbsent(mapKey, cntr); + + if (old != null) + return old; + } + + return cntr; + } + + /** {@inheritDoc} */ + @Override public Collection all() { + return cntrsMap.values(); + } + + /** {@inheritDoc} */ + @Override public void merge(GridHadoopCounters other) { + for (GridHadoopCounter counter : other.all()) + counter(counter.group(), counter.name(), counter.getClass()).merge(counter); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeCollection(out, cntrsMap.values()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + addCounters(U.readCollection(in), false); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopCountersImpl counters = (HadoopCountersImpl)o; + + return cntrsMap.equals(counters.cntrsMap); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrsMap.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values()); + } + + /** + * The tuple of counter identifier components for more readable code. + */ + private static class CounterKey extends GridTuple3, String, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * + * @param cls Class of the counter. + * @param grp Group name. + * @param name Counter name. + */ + private CounterKey(Class cls, String grp, String name) { + super(cls, grp, name); + } + + /** + * Empty constructor required by {@link Externalizable}. + */ + public CounterKey() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java new file mode 100644 index 0000000..1aa1e0e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.internal.processors.hadoop.*; + +import java.io.*; + +/** + * Standard hadoop counter to use via original Hadoop API in Hadoop jobs. + */ +public class HadoopLongCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The counter value. */ + private long val; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopLongCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopLongCounter(String grp, String name) { + super(grp, name); + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + out.writeLong(val); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + val = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void merge(GridHadoopCounter cntr) { + val += ((HadoopLongCounter)cntr).val; + } + + /** + * Gets current value of this counter. + * + * @return Current value. + */ + public long value() { + return val; + } + + /** + * Sets current value by the given value. + * + * @param val Value to set. + */ + public void value(long val) { + this.val = val; + } + + /** + * Increment this counter by the given value. + * + * @param i Value to increase this counter by. + */ + public void increment(long i) { + val += i; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java new file mode 100644 index 0000000..f22d0cd --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Counter for the job statistics accumulation. + */ +public class HadoopPerformanceCounter extends HadoopCounterAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** The group name for this counter. */ + private static final String GROUP_NAME = "SYSTEM"; + + /** The counter name for this counter. */ + private static final String COUNTER_NAME = "PERFORMANCE"; + + /** Events collections. */ + private Collection> evts = new ArrayList<>(); + + /** Node id to insert into the event info. */ + private UUID nodeId; + + /** */ + private int reducerNum; + + /** */ + private volatile Long firstShuffleMsg; + + /** */ + private volatile Long lastShuffleMsg; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopPerformanceCounter() { + // No-op. + } + + /** + * Constructor. + * + * @param grp Group name. + * @param name Counter name. + */ + public HadoopPerformanceCounter(String grp, String name) { + super(grp, name); + } + + /** + * Constructor to create instance to use this as helper. + * + * @param nodeId Id of the work node. + */ + public HadoopPerformanceCounter(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override protected void writeValue(ObjectOutput out) throws IOException { + U.writeCollection(out, evts); + } + + /** {@inheritDoc} */ + @Override protected void readValue(ObjectInput in) throws IOException { + try { + evts = U.readCollection(in); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public void merge(GridHadoopCounter cntr) { + evts.addAll(((HadoopPerformanceCounter)cntr).evts); + } + + /** + * Gets the events collection. + * + * @return Collection of event. + */ + public Collection> evts() { + return evts; + } + + /** + * Generate name that consists of some event information. + * + * @param info Task info. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(GridHadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { + assert nodeId != null; + + return taskType + " " + taskNum + " " + evtType + " " + nodeId; + } + + /** + * Adds event of the task submission (task instance creation). + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskSubmit(GridHadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "submit"), ts)); + } + + /** + * Adds event of the task preparation. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskPrepare(GridHadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "prepare"), ts)); + } + + /** + * Adds event of the task finish. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskFinish(GridHadoopTaskInfo info, long ts) { + if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) { + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); + + lastShuffleMsg = null; + } + + evts.add(new T2<>(eventName(info, "finish"), ts)); + } + + /** + * Adds event of the task run. + * + * @param info Task info. + * @param ts Timestamp of the event. + */ + public void onTaskStart(GridHadoopTaskInfo info, long ts) { + evts.add(new T2<>(eventName(info, "start"), ts)); + } + + /** + * Adds event of the job preparation. + * + * @param ts Timestamp of the event. + */ + public void onJobPrepare(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB prepare " + nodeId, ts)); + } + + /** + * Adds event of the job start. + * + * @param ts Timestamp of the event. + */ + public void onJobStart(long ts) { + assert nodeId != null; + + evts.add(new T2<>("JOB start " + nodeId, ts)); + } + + /** + * Adds client submission events from job info. + * + * @param info Job info. + */ + public void clientSubmissionEvents(GridHadoopJobInfo info) { + assert nodeId != null; + + addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY); + addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY); + } + + /** + * Adds event with timestamp from some property in job info. + * + * @param evt Event type and phase. + * @param info Job info. + * @param propName Property name to get timestamp. + */ + private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) { + String val = info.property(propName); + + if (!F.isEmpty(val)) { + try { + evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val))); + } + catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e); + } + } + } + + /** + * Registers shuffle message event. + * + * @param reducerNum Number of reducer that receives the data. + * @param ts Timestamp of the event. + */ + public void onShuffleMessage(int reducerNum, long ts) { + this.reducerNum = reducerNum; + + if (firstShuffleMsg == null) + firstShuffleMsg = ts; + + lastShuffleMsg = ts; + } + + /** + * Gets system predefined performance counter from the GridHadoopCounters object. + * + * @param cntrs GridHadoopCounters object. + * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. + * @return Predefined performance counter. + */ + public static HadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) { + HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + + if (nodeId != null) + cntr.nodeId(nodeId); + + return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); + } + + /** + * Sets the nodeId field. + * + * @param nodeId Node id. + */ + private void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java deleted file mode 100644 index e9461e2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class GridHadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal userName = new ThreadLocal() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal workingDir = new ThreadLocal() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -}