ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [15/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (2).
Date Tue, 03 Mar 2015 13:08:31 GMT
# IGNITE-386: WIP on internal namings (2).


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/288709a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/288709a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/288709a1

Branch: refs/heads/ignite-386
Commit: 288709a1b48260e665278037e1beb050ab8ecdb4
Parents: ace354c
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Mar 3 15:55:58 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Mar 3 15:55:58 2015 +0300

----------------------------------------------------------------------
 docs/core-site.ignite.xml                       |    2 +-
 .../ignite/internal/IgniteComponentType.java    |    2 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   92 ++
 .../processors/hadoop/HadoopContext.java        |    2 +-
 .../processors/hadoop/HadoopCounters.java       |   14 +-
 .../internal/processors/hadoop/HadoopImpl.java  |    4 +-
 .../processors/hadoop/HadoopProcessor.java      |  225 ++++
 .../hadoop/IgniteHadoopProcessor.java           |  225 ----
 .../counter/GridHadoopCounterAdapter.java       |  128 ---
 .../hadoop/counter/GridHadoopCountersImpl.java  |  198 ----
 .../counter/GridHadoopFSCounterWriter.java      |   91 --
 .../hadoop/counter/GridHadoopLongCounter.java   |   92 --
 .../counter/GridHadoopPerformanceCounter.java   |  279 -----
 .../hadoop/counter/HadoopCounterAdapter.java    |  128 +++
 .../hadoop/counter/HadoopCountersImpl.java      |  198 ++++
 .../hadoop/counter/HadoopLongCounter.java       |   92 ++
 .../counter/HadoopPerformanceCounter.java       |  279 +++++
 .../fs/GridHadoopDistributedFileSystem.java     |   91 --
 .../hadoop/fs/GridHadoopFileSystemsUtils.java   |   57 -
 .../hadoop/fs/GridHadoopLocalFileSystemV1.java  |   39 -
 .../hadoop/fs/GridHadoopLocalFileSystemV2.java  |   86 --
 .../hadoop/fs/GridHadoopRawLocalFileSystem.java |  304 ------
 .../hadoop/fs/HadoopDistributedFileSystem.java  |   91 ++
 .../hadoop/fs/HadoopFileSystemsUtils.java       |   57 +
 .../hadoop/fs/HadoopLocalFileSystemV1.java      |   39 +
 .../hadoop/fs/HadoopLocalFileSystemV2.java      |   86 ++
 .../hadoop/fs/HadoopRawLocalFileSystem.java     |  304 ++++++
 .../jobtracker/GridHadoopJobMetadata.java       |  305 ------
 .../hadoop/jobtracker/HadoopJobMetadata.java    |  305 ++++++
 .../hadoop/jobtracker/HadoopJobTracker.java     |  104 +-
 .../hadoop/message/GridHadoopMessage.java       |   27 -
 .../hadoop/message/HadoopMessage.java           |   27 +
 .../planner/GridHadoopDefaultMapReducePlan.java |  107 --
 .../GridHadoopDefaultMapReducePlanner.java      |  434 --------
 .../planner/HadoopDefaultMapReducePlan.java     |  107 ++
 .../planner/HadoopDefaultMapReducePlanner.java  |  434 ++++++++
 .../GridHadoopProtocolJobCountersTask.java      |   45 -
 .../proto/GridHadoopProtocolJobStatusTask.java  |   81 --
 .../proto/GridHadoopProtocolKillJobTask.java    |   46 -
 .../proto/GridHadoopProtocolNextTaskIdTask.java |   35 -
 .../proto/GridHadoopProtocolSubmitJobTask.java  |   57 -
 .../proto/GridHadoopProtocolTaskAdapter.java    |  113 --
 .../proto/GridHadoopProtocolTaskArguments.java  |   81 --
 .../hadoop/proto/HadoopClientProtocol.java      |   22 +-
 .../proto/HadoopProtocolJobCountersTask.java    |   45 +
 .../proto/HadoopProtocolJobStatusTask.java      |   81 ++
 .../hadoop/proto/HadoopProtocolKillJobTask.java |   46 +
 .../proto/HadoopProtocolNextTaskIdTask.java     |   35 +
 .../proto/HadoopProtocolSubmitJobTask.java      |   57 +
 .../hadoop/proto/HadoopProtocolTaskAdapter.java |  113 ++
 .../proto/HadoopProtocolTaskArguments.java      |   81 ++
 .../hadoop/shuffle/GridHadoopShuffleAck.java    |   91 --
 .../hadoop/shuffle/GridHadoopShuffleJob.java    |  593 -----------
 .../shuffle/GridHadoopShuffleMessage.java       |  242 -----
 .../hadoop/shuffle/HadoopShuffle.java           |   38 +-
 .../hadoop/shuffle/HadoopShuffleAck.java        |   91 ++
 .../hadoop/shuffle/HadoopShuffleJob.java        |  593 +++++++++++
 .../hadoop/shuffle/HadoopShuffleMessage.java    |  241 +++++
 .../GridHadoopConcurrentHashMultimap.java       |  611 -----------
 .../collections/GridHadoopHashMultimap.java     |  174 ---
 .../collections/GridHadoopHashMultimapBase.java |  208 ----
 .../shuffle/collections/GridHadoopMultimap.java |  112 --
 .../collections/GridHadoopMultimapBase.java     |  368 -------
 .../shuffle/collections/GridHadoopSkipList.java |  726 -------------
 .../HadoopConcurrentHashMultimap.java           |  611 +++++++++++
 .../shuffle/collections/HadoopHashMultimap.java |  174 +++
 .../collections/HadoopHashMultimapBase.java     |  208 ++++
 .../shuffle/collections/HadoopMultimap.java     |  112 ++
 .../shuffle/collections/HadoopMultimapBase.java |  368 +++++++
 .../shuffle/collections/HadoopSkipList.java     |  726 +++++++++++++
 .../shuffle/streams/GridHadoopDataInStream.java |  170 ---
 .../streams/GridHadoopDataOutStream.java        |  131 ---
 .../streams/GridHadoopOffheapBuffer.java        |  122 ---
 .../shuffle/streams/HadoopDataInStream.java     |  170 +++
 .../shuffle/streams/HadoopDataOutStream.java    |  131 +++
 .../shuffle/streams/HadoopOffheapBuffer.java    |  122 +++
 .../taskexecutor/GridHadoopExecutorService.java |  232 ----
 .../taskexecutor/GridHadoopRunnableTask.java    |   22 +-
 .../taskexecutor/GridHadoopTaskState.java       |   38 -
 .../taskexecutor/GridHadoopTaskStatus.java      |  114 --
 .../HadoopEmbeddedTaskExecutor.java             |    8 +-
 .../taskexecutor/HadoopExecutorService.java     |  231 ++++
 .../taskexecutor/HadoopTaskExecutorAdapter.java |    2 +-
 .../hadoop/taskexecutor/HadoopTaskState.java    |   38 +
 .../hadoop/taskexecutor/HadoopTaskStatus.java   |  114 ++
 .../GridHadoopExternalTaskMetadata.java         |   68 --
 .../GridHadoopJobInfoUpdateRequest.java         |  109 --
 .../GridHadoopPrepareForJobRequest.java         |  126 ---
 .../external/GridHadoopProcessDescriptor.java   |  150 ---
 .../external/GridHadoopProcessStartedAck.java   |   46 -
 .../GridHadoopTaskExecutionRequest.java         |  110 --
 .../external/GridHadoopTaskFinishedMessage.java |   92 --
 .../external/HadoopExternalTaskExecutor.java    |   78 +-
 .../external/HadoopExternalTaskMetadata.java    |   68 ++
 .../external/HadoopJobInfoUpdateRequest.java    |  109 ++
 .../external/HadoopPrepareForJobRequest.java    |  126 +++
 .../external/HadoopProcessDescriptor.java       |  150 +++
 .../external/HadoopProcessStartedAck.java       |   46 +
 .../external/HadoopTaskExecutionRequest.java    |  110 ++
 .../external/HadoopTaskFinishedMessage.java     |   92 ++
 .../child/GridHadoopChildProcessRunner.java     |   72 +-
 .../child/GridHadoopExternalProcessStarter.java |    2 +-
 .../GridHadoopCommunicationClient.java          |    2 +-
 .../GridHadoopExternalCommunication.java        |   52 +-
 .../GridHadoopMarshallerFilter.java             |    2 +-
 .../GridHadoopMessageListener.java              |    4 +-
 .../GridHadoopTcpNioCommunicationClient.java    |    2 +-
 .../hadoop/v1/GridHadoopV1Counter.java          |    4 +-
 .../hadoop/v1/GridHadoopV1Reporter.java         |    2 +-
 .../hadoop/v2/GridHadoopV2Context.java          |    2 +-
 .../hadoop/v2/GridHadoopV2Counter.java          |    4 +-
 .../processors/hadoop/v2/GridHadoopV2Job.java   |    2 +-
 .../v2/GridHadoopV2JobResourceManager.java      |    2 +-
 .../hadoop/v2/GridHadoopV2TaskContext.java      |    4 +-
 .../hadoop/GridHadoopAbstractSelfTest.java      |    2 +-
 .../hadoop/GridHadoopCommandLineTest.java       |    4 +-
 ...idHadoopDefaultMapReducePlannerSelfTest.java | 1005 ------------------
 .../hadoop/GridHadoopFileSystemsTest.java       |    6 +-
 .../hadoop/GridHadoopMapReduceTest.java         |    7 +-
 .../GridHadoopTestRoundRobinMrPlanner.java      |    2 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  | 1005 ++++++++++++++++++
 ...ridHadoopConcurrentHashMultimapSelftest.java |   12 +-
 .../collections/GridHadoopHashMapSelfTest.java  |    6 +-
 .../collections/GridHadoopSkipListSelfTest.java |   14 +-
 .../streams/GridHadoopDataStreamSelfTest.java   |    4 +-
 .../GridHadoopExecutorServiceTest.java          |  119 ---
 .../taskexecutor/HadoopExecutorServiceTest.java |  119 +++
 ...GridHadoopExternalCommunicationSelfTest.java |    6 +-
 .../testsuites/IgniteHadoopTestSuite.java       |    2 +-
 129 files changed, 8937 insertions(+), 8937 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/docs/core-site.ignite.xml
----------------------------------------------------------------------
diff --git a/docs/core-site.ignite.xml b/docs/core-site.ignite.xml
index 1146576..8b8e634 100644
--- a/docs/core-site.ignite.xml
+++ b/docs/core-site.ignite.xml
@@ -73,7 +73,7 @@
     <!--
     <property>
         <name>ignite.counters.writer</name>
-        <value>org.apache.ignite.internal.processors.hadoop.counter.GridHadoopFSCounterWriter</value>
+        <value>org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter</value>
     </property>
     -->
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index d0e487a..a51800e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -36,7 +36,7 @@ public enum IgniteComponentType {
     /** Hadoop. */
     HADOOP(
         "org.apache.ignite.internal.processors.hadoop.IgniteHadoopNoopProcessor",
-        "org.apache.ignite.internal.processors.hadoop.IgniteHadoopProcessor",
+        "org.apache.ignite.internal.processors.hadoop.HadoopProcessor",
         "ignite-hadoop"
     ),
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
new file mode 100644
index 0000000..449cff2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Statistic writer implementation that writes info into any Hadoop file system.
+ */
+public class IgniteHadoopFileSystemCounterWriter implements GridHadoopCounterWriter {
+    /** */
+    public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
+
+    /** */
+    private static final String DEFAULT_USER_NAME = "anonymous";
+
+    /** */
+    public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
+
+    /** */
+    private static final String USER_MACRO = "${USER}";
+
+    /** */
+    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
+
+    /** {@inheritDoc} */
+    @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
+        throws IgniteCheckedException {
+
+        Configuration hadoopCfg = new Configuration();
+
+        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
+            hadoopCfg.set(e.getKey(), e.getValue());
+
+        String user = jobInfo.user();
+
+        if (F.isEmpty(user))
+            user = DEFAULT_USER_NAME;
+
+        String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
+
+        if (dir == null)
+            dir = DEFAULT_COUNTER_WRITER_DIR;
+
+        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        try {
+            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+
+            fs.mkdirs(jobStatPath);
+
+            try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
+                for (T2<String, Long> evt : perfCntr.evts()) {
+                    out.print(evt.get1());
+                    out.print(':');
+                    out.println(evt.get2().toString());
+                }
+
+                out.flush();
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index bb707c8..d897b6c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -143,7 +143,7 @@ public class HadoopContext {
      * @param meta Job metadata.
      * @return {@code true} If local node is participating in job execution.
      */
-    public boolean isParticipating(GridHadoopJobMetadata meta) {
+    public boolean isParticipating(HadoopJobMetadata meta) {
         UUID locNodeId = localNodeId();
 
         if (locNodeId.equals(meta.submitNodeId()))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
index c7f0157..ad699ec 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
@@ -31,7 +31,7 @@ import java.util.*;
  */
 public class HadoopCounters extends Counters {
     /** */
-    private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>();
+    private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
 
     /**
      * Creates new instance based on given counters.
@@ -40,8 +40,8 @@ public class HadoopCounters extends Counters {
      */
     public HadoopCounters(GridHadoopCounters cntrs) {
         for (GridHadoopCounter cntr : cntrs.all())
-            if (cntr instanceof GridHadoopLongCounter)
-                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr);
+            if (cntr instanceof HadoopLongCounter)
+                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
     }
 
     /** {@inheritDoc} */
@@ -184,7 +184,7 @@ public class HadoopCounters extends Counters {
     public Iterator<Counter> iterateGroup(String grpName) {
         Collection<Counter> grpCounters = new ArrayList<>();
 
-        for (GridHadoopLongCounter counter : cntrs.values()) {
+        for (HadoopLongCounter counter : cntrs.values()) {
             if (grpName.equals(counter.group()))
                 grpCounters.add(new GridHadoopV2Counter(counter));
         }
@@ -203,12 +203,12 @@ public class HadoopCounters extends Counters {
     public Counter findCounter(String grpName, String cntrName, boolean create) {
         T2<String, String> key = new T2<>(grpName, cntrName);
 
-        GridHadoopLongCounter internalCntr = cntrs.get(key);
+        HadoopLongCounter internalCntr = cntrs.get(key);
 
         if (internalCntr == null & create) {
-            internalCntr = new GridHadoopLongCounter(grpName,cntrName);
+            internalCntr = new HadoopLongCounter(grpName,cntrName);
 
-            cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName));
+            cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
         }
 
         return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
index 80fd995..b87e7f8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.*;
  */
 public class HadoopImpl implements GridHadoop {
     /** Hadoop processor. */
-    private final IgniteHadoopProcessor proc;
+    private final HadoopProcessor proc;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -37,7 +37,7 @@ public class HadoopImpl implements GridHadoop {
      *
      * @param proc Hadoop processor.
      */
-    HadoopImpl(IgniteHadoopProcessor proc) {
+    HadoopImpl(HadoopProcessor proc) {
         this.proc = proc;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
new file mode 100644
index 0000000..1f50b0c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.planner.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
+    /** Job ID counter. */
+    private final AtomicInteger idCtr = new AtomicInteger();
+
+    /** Hadoop context. */
+    @GridToStringExclude
+    private HadoopContext hctx;
+
+    /** Hadoop facade for public API. */
+    @GridToStringExclude
+    private GridHadoop hadoop;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public HadoopProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+
+        if (cfg == null)
+            cfg = new GridHadoopConfiguration();
+        else
+            cfg = new GridHadoopConfiguration(cfg);
+
+        initializeDefaults(cfg);
+
+        validate(cfg);
+
+        if (hadoopHome() != null)
+            U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
+
+        boolean ok = false;
+
+        try { // Check for Hadoop installation.
+            hadoopUrls();
+
+            ok = true;
+        }
+        catch (IgniteCheckedException e) {
+            U.quietAndWarn(log, e.getMessage());
+        }
+
+        if (ok) {
+            hctx = new HadoopContext(
+                ctx,
+                cfg,
+                new HadoopJobTracker(),
+                cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
+                new HadoopShuffle());
+
+
+            for (HadoopComponent c : hctx.components())
+                c.start(hctx);
+
+            hadoop = new HadoopImpl(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessor.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+
+        if (hctx == null)
+            return;
+
+        List<HadoopComponent> components = hctx.components();
+
+        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+
+            c.stop(cancel);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        if (hctx == null)
+            return;
+
+        for (HadoopComponent c : hctx.components())
+            c.onKernalStart();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (hctx == null)
+            return;
+
+        List<HadoopComponent> components = hctx.components();
+
+        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+
+            c.onKernalStop(cancel);
+        }
+    }
+
+    /**
+     * Gets Hadoop context.
+     *
+     * @return Hadoop context.
+     */
+    public HadoopContext context() {
+        return hctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoop hadoop() {
+        if (hadoop == null)
+            throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
+                "is HADOOP_HOME environment variable set?)");
+
+        return hadoop;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration config() {
+        return hctx.configuration();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopJobId nextJobId() {
+        return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+        return hctx.jobTracker().submit(jobId, jobInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().status(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().jobCounters(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().finishFuture(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().killJob(jobId);
+    }
+
+    /**
+     * Initializes default hadoop configuration.
+     *
+     * @param cfg Hadoop configuration.
+     */
+    private void initializeDefaults(GridHadoopConfiguration cfg) {
+        if (cfg.getMapReducePlanner() == null)
+            cfg.setMapReducePlanner(new HadoopDefaultMapReducePlanner());
+    }
+
+    /**
+     * Validates Grid and Hadoop configuration for correctness.
+     *
+     * @param hadoopCfg Hadoop configuration.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
+        if (ctx.config().isPeerClassLoadingEnabled())
+            throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
+                "GridConfiguration.setPeerClassLoadingEnabled()).");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
deleted file mode 100644
index 63e4854..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.planner.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
-
-/**
- * Hadoop processor.
- */
-public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter {
-    /** Job ID counter. */
-    private final AtomicInteger idCtr = new AtomicInteger();
-
-    /** Hadoop context. */
-    @GridToStringExclude
-    private HadoopContext hctx;
-
-    /** Hadoop facade for public API. */
-    @GridToStringExclude
-    private GridHadoop hadoop;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public IgniteHadoopProcessor(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.isDaemon())
-            return;
-
-        GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
-
-        if (cfg == null)
-            cfg = new GridHadoopConfiguration();
-        else
-            cfg = new GridHadoopConfiguration(cfg);
-
-        initializeDefaults(cfg);
-
-        validate(cfg);
-
-        if (hadoopHome() != null)
-            U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
-
-        boolean ok = false;
-
-        try { // Check for Hadoop installation.
-            hadoopUrls();
-
-            ok = true;
-        }
-        catch (IgniteCheckedException e) {
-            U.quietAndWarn(log, e.getMessage());
-        }
-
-        if (ok) {
-            hctx = new HadoopContext(
-                ctx,
-                cfg,
-                new HadoopJobTracker(),
-                cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
-                new HadoopShuffle());
-
-
-            for (HadoopComponent c : hctx.components())
-                c.start(hctx);
-
-            hadoop = new HadoopImpl(this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteHadoopProcessor.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
-
-        if (hctx == null)
-            return;
-
-        List<HadoopComponent> components = hctx.components();
-
-        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
-            HadoopComponent c = it.previous();
-
-            c.stop(cancel);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (hctx == null)
-            return;
-
-        for (HadoopComponent c : hctx.components())
-            c.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (hctx == null)
-            return;
-
-        List<HadoopComponent> components = hctx.components();
-
-        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
-            HadoopComponent c = it.previous();
-
-            c.onKernalStop(cancel);
-        }
-    }
-
-    /**
-     * Gets Hadoop context.
-     *
-     * @return Hadoop context.
-     */
-    public HadoopContext context() {
-        return hctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoop hadoop() {
-        if (hadoop == null)
-            throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
-                "is HADOOP_HOME environment variable set?)");
-
-        return hadoop;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopConfiguration config() {
-        return hctx.configuration();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobId nextJobId() {
-        return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
-        return hctx.jobTracker().submit(jobId, jobInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().status(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().jobCounters(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().finishFuture(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().killJob(jobId);
-    }
-
-    /**
-     * Initializes default hadoop configuration.
-     *
-     * @param cfg Hadoop configuration.
-     */
-    private void initializeDefaults(GridHadoopConfiguration cfg) {
-        if (cfg.getMapReducePlanner() == null)
-            cfg.setMapReducePlanner(new GridHadoopDefaultMapReducePlanner());
-    }
-
-    /**
-     * Validates Grid and Hadoop configuration for correctness.
-     *
-     * @param hadoopCfg Hadoop configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
-        if (ctx.config().isPeerClassLoadingEnabled())
-            throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
-                "GridConfiguration.setPeerClassLoadingEnabled()).");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
deleted file mode 100644
index 9e46846..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Default Hadoop counter implementation.
- */
-public abstract class GridHadoopCounterAdapter implements GridHadoopCounter, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Counter group name. */
-    private String grp;
-
-    /** Counter name. */
-    private String name;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    protected GridHadoopCounterAdapter() {
-        // No-op.
-    }
-
-    /**
-     * Creates new counter with given group and name.
-     *
-     * @param grp Counter group name.
-     * @param name Counter name.
-     */
-    protected GridHadoopCounterAdapter(String grp, String name) {
-        assert grp != null : "counter must have group";
-        assert name != null : "counter must have name";
-
-        this.grp = grp;
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public String group() {
-        return grp;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeUTF(grp);
-        out.writeUTF(name);
-        writeValue(out);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        grp = in.readUTF();
-        name = in.readUTF();
-        readValue(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopCounterAdapter cntr = (GridHadoopCounterAdapter)o;
-
-        if (!grp.equals(cntr.grp))
-            return false;
-        if (!name.equals(cntr.name))
-            return false;
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = grp.hashCode();
-        res = 31 * res + name.hashCode();
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopCounterAdapter.class, this);
-    }
-
-    /**
-     * Writes value of this counter to output.
-     *
-     * @param out Output.
-     * @throws IOException If failed.
-     */
-    protected abstract void writeValue(ObjectOutput out) throws IOException;
-
-    /**
-     * Read value of this counter from input.
-     *
-     * @param in Input.
-     * @throws IOException If failed.
-     */
-    protected abstract void readValue(ObjectInput in) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
deleted file mode 100644
index 92d54af..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Default in-memory counters store.
- */
-public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
-
-    /**
-     * Default constructor. Creates new instance without counters.
-     */
-    public GridHadoopCountersImpl() {
-        // No-op.
-    }
-
-    /**
-     * Creates new instance that contain given counters.
-     *
-     * @param cntrs Counters to store.
-     */
-    public GridHadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
-        addCounters(cntrs, true);
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param cntrs Counters to copy.
-     */
-    public GridHadoopCountersImpl(GridHadoopCounters cntrs) {
-        this(cntrs.all());
-    }
-
-    /**
-     * Creates counter instance.
-     *
-     * @param cls Class of the counter.
-     * @param grp Group name.
-     * @param name Counter name.
-     * @return Counter.
-     */
-    private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
-        String name) {
-        try {
-            Constructor constructor = cls.getConstructor(String.class, String.class);
-
-            return (T)constructor.newInstance(grp, name);
-        }
-        catch (Exception e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /**
-     * Adds counters collection in addition to existing counters.
-     *
-     * @param cntrs Counters to add.
-     * @param cp Whether to copy counters or not.
-     */
-    private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
-        assert cntrs != null;
-
-        for (GridHadoopCounter cntr : cntrs) {
-            if (cp) {
-                GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
-
-                cntrCp.merge(cntr);
-
-                cntr = cntrCp;
-            }
-
-            cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
-        assert cls != null;
-
-        CounterKey mapKey = new CounterKey(cls, grp, name);
-
-        T cntr = (T)cntrsMap.get(mapKey);
-
-        if (cntr == null) {
-            cntr = createCounter(cls, grp, name);
-
-            T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
-
-            if (old != null)
-                return old;
-        }
-
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridHadoopCounter> all() {
-        return cntrsMap.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounters other) {
-        for (GridHadoopCounter counter : other.all())
-            counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeCollection(out, cntrsMap.values());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        addCounters(U.<GridHadoopCounter>readCollection(in), false);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopCountersImpl counters = (GridHadoopCountersImpl)o;
-
-        return cntrsMap.equals(counters.cntrsMap);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return cntrsMap.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopCountersImpl.class, this, "counters", cntrsMap.values());
-    }
-
-    /**
-     * The tuple of counter identifier components for more readable code.
-     */
-    private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Constructor.
-         *
-         * @param cls Class of the counter.
-         * @param grp Group name.
-         * @param name Counter name.
-         */
-        private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
-            super(cls, grp, name);
-        }
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public CounterKey() {
-            // No-op.
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
deleted file mode 100644
index d603d76..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Statistic writer implementation that writes info into any Hadoop file system.
- */
-public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter {
-    /** */
-    public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
-
-    /** */
-    private static final String DEFAULT_USER_NAME = "anonymous";
-
-    /** */
-    public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
-
-    /** */
-    private static final String USER_MACRO = "${USER}";
-
-    /** */
-    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
-
-    /** {@inheritDoc} */
-    @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
-        throws IgniteCheckedException {
-
-        Configuration hadoopCfg = new Configuration();
-
-        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
-            hadoopCfg.set(e.getKey(), e.getValue());
-
-        String user = jobInfo.user();
-
-        if (F.isEmpty(user))
-            user = DEFAULT_USER_NAME;
-
-        String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
-        if (dir == null)
-            dir = DEFAULT_COUNTER_WRITER_DIR;
-
-        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
-        GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null);
-
-        try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
-
-            fs.mkdirs(jobStatPath);
-
-            try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
-                }
-
-                out.flush();
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
deleted file mode 100644
index 67af49f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
- */
-public class GridHadoopLongCounter extends GridHadoopCounterAdapter {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** The counter value. */
-    private long val;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public GridHadoopLongCounter() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param grp Group name.
-     * @param name Counter name.
-     */
-    public GridHadoopLongCounter(String grp, String name) {
-        super(grp, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeValue(ObjectOutput out) throws IOException {
-        out.writeLong(val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void readValue(ObjectInput in) throws IOException {
-        val = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounter cntr) {
-        val += ((GridHadoopLongCounter)cntr).val;
-    }
-
-    /**
-     * Gets current value of this counter.
-     *
-     * @return Current value.
-     */
-    public long value() {
-        return val;
-    }
-
-    /**
-     * Sets current value by the given value.
-     *
-     * @param val Value to set.
-     */
-    public void value(long val) {
-        this.val = val;
-    }
-
-    /**
-     * Increment this counter by the given value.
-     *
-     * @param i Value to increase this counter by.
-     */
-    public void increment(long i) {
-        val += i;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
deleted file mode 100644
index 263a075..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-
-/**
- * Counter for the job statistics accumulation.
- */
-public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** The group name for this counter. */
-    private static final String GROUP_NAME = "SYSTEM";
-
-    /** The counter name for this counter. */
-    private static final String COUNTER_NAME = "PERFORMANCE";
-
-    /** Events collections. */
-    private Collection<T2<String,Long>> evts = new ArrayList<>();
-
-    /** Node id to insert into the event info. */
-    private UUID nodeId;
-
-    /** */
-    private int reducerNum;
-
-    /** */
-    private volatile Long firstShuffleMsg;
-
-    /** */
-    private volatile Long lastShuffleMsg;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public GridHadoopPerformanceCounter() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param grp Group name.
-     * @param name Counter name.
-     */
-    public GridHadoopPerformanceCounter(String grp, String name) {
-        super(grp, name);
-    }
-
-    /**
-     * Constructor to create instance to use this as helper.
-     *
-     * @param nodeId Id of the work node.
-     */
-    public GridHadoopPerformanceCounter(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeValue(ObjectOutput out) throws IOException {
-        U.writeCollection(out, evts);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void readValue(ObjectInput in) throws IOException {
-        try {
-            evts = U.readCollection(in);
-        }
-        catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounter cntr) {
-        evts.addAll(((GridHadoopPerformanceCounter)cntr).evts);
-    }
-
-    /**
-     * Gets the events collection.
-     *
-     * @return Collection of event.
-     */
-    public Collection<T2<String, Long>> evts() {
-        return evts;
-    }
-
-    /**
-     * Generate name that consists of some event information.
-     *
-     * @param info Task info.
-     * @param evtType The type of the event.
-     * @return String contains necessary event information.
-     */
-    private String eventName(GridHadoopTaskInfo info, String evtType) {
-        return eventName(info.type().toString(), info.taskNumber(), evtType);
-    }
-
-    /**
-     * Generate name that consists of some event information.
-     *
-     * @param taskType Task type.
-     * @param taskNum Number of the task.
-     * @param evtType The type of the event.
-     * @return String contains necessary event information.
-     */
-    private String eventName(String taskType, int taskNum, String evtType) {
-        assert nodeId != null;
-
-        return taskType + " " + taskNum + " " + evtType + " " + nodeId;
-    }
-
-    /**
-     * Adds event of the task submission (task instance creation).
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "submit"), ts));
-    }
-
-    /**
-     * Adds event of the task preparation.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "prepare"), ts));
-    }
-
-    /**
-     * Adds event of the task finish.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
-        if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
-            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
-            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
-
-            lastShuffleMsg = null;
-        }
-
-        evts.add(new T2<>(eventName(info, "finish"), ts));
-    }
-
-    /**
-     * Adds event of the task run.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskStart(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "start"), ts));
-    }
-
-    /**
-     * Adds event of the job preparation.
-     *
-     * @param ts Timestamp of the event.
-     */
-    public void onJobPrepare(long ts) {
-        assert nodeId != null;
-
-        evts.add(new T2<>("JOB prepare " + nodeId, ts));
-    }
-
-    /**
-     * Adds event of the job start.
-     *
-     * @param ts Timestamp of the event.
-     */
-    public void onJobStart(long ts) {
-        assert nodeId != null;
-
-        evts.add(new T2<>("JOB start " + nodeId, ts));
-    }
-
-    /**
-     * Adds client submission events from job info.
-     *
-     * @param info Job info.
-     */
-    public void clientSubmissionEvents(GridHadoopJobInfo info) {
-        assert nodeId != null;
-
-        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
-    }
-
-    /**
-     * Adds event with timestamp from some property in job info.
-     *
-     * @param evt Event type and phase.
-     * @param info Job info.
-     * @param propName Property name to get timestamp.
-     */
-    private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
-        String val = info.property(propName);
-
-        if (!F.isEmpty(val)) {
-            try {
-                evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
-            }
-            catch (NumberFormatException e) {
-                throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
-            }
-        }
-    }
-
-    /**
-     * Registers shuffle message event.
-     *
-     * @param reducerNum Number of reducer that receives the data.
-     * @param ts Timestamp of the event.
-     */
-    public void onShuffleMessage(int reducerNum, long ts) {
-        this.reducerNum = reducerNum;
-
-        if (firstShuffleMsg == null)
-            firstShuffleMsg = ts;
-
-        lastShuffleMsg = ts;
-    }
-
-    /**
-     * Gets system predefined performance counter from the GridHadoopCounters object.
-     *
-     * @param cntrs GridHadoopCounters object.
-     * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
-     * @return Predefined performance counter.
-     */
-    public static GridHadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
-        GridHadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
-
-        if (nodeId != null)
-            cntr.nodeId(nodeId);
-
-        return cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
-    }
-
-    /**
-     * Sets the nodeId field.
-     *
-     * @param nodeId Node id.
-     */
-    private void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
new file mode 100644
index 0000000..3fdce14
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Default Hadoop counter implementation.
+ */
+public abstract class HadoopCounterAdapter implements GridHadoopCounter, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Counter group name. */
+    private String grp;
+
+    /** Counter name. */
+    private String name;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    protected HadoopCounterAdapter() {
+        // No-op.
+    }
+
+    /**
+     * Creates new counter with given group and name.
+     *
+     * @param grp Counter group name.
+     * @param name Counter name.
+     */
+    protected HadoopCounterAdapter(String grp, String name) {
+        assert grp != null : "counter must have group";
+        assert name != null : "counter must have name";
+
+        this.grp = grp;
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String group() {
+        return grp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(grp);
+        out.writeUTF(name);
+        writeValue(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        grp = in.readUTF();
+        name = in.readUTF();
+        readValue(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
+
+        if (!grp.equals(cntr.grp))
+            return false;
+        if (!name.equals(cntr.name))
+            return false;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = grp.hashCode();
+        res = 31 * res + name.hashCode();
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCounterAdapter.class, this);
+    }
+
+    /**
+     * Writes value of this counter to output.
+     *
+     * @param out Output.
+     * @throws IOException If failed.
+     */
+    protected abstract void writeValue(ObjectOutput out) throws IOException;
+
+    /**
+     * Read value of this counter from input.
+     *
+     * @param in Input.
+     * @throws IOException If failed.
+     */
+    protected abstract void readValue(ObjectInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
new file mode 100644
index 0000000..01b1473
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Default in-memory counters store.
+ */
+public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
+
+    /**
+     * Default constructor. Creates new instance without counters.
+     */
+    public HadoopCountersImpl() {
+        // No-op.
+    }
+
+    /**
+     * Creates new instance that contain given counters.
+     *
+     * @param cntrs Counters to store.
+     */
+    public HadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
+        addCounters(cntrs, true);
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param cntrs Counters to copy.
+     */
+    public HadoopCountersImpl(GridHadoopCounters cntrs) {
+        this(cntrs.all());
+    }
+
+    /**
+     * Creates counter instance.
+     *
+     * @param cls Class of the counter.
+     * @param grp Group name.
+     * @param name Counter name.
+     * @return Counter.
+     */
+    private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
+        String name) {
+        try {
+            Constructor constructor = cls.getConstructor(String.class, String.class);
+
+            return (T)constructor.newInstance(grp, name);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Adds counters collection in addition to existing counters.
+     *
+     * @param cntrs Counters to add.
+     * @param cp Whether to copy counters or not.
+     */
+    private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
+        assert cntrs != null;
+
+        for (GridHadoopCounter cntr : cntrs) {
+            if (cp) {
+                GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
+
+                cntrCp.merge(cntr);
+
+                cntr = cntrCp;
+            }
+
+            cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
+        assert cls != null;
+
+        CounterKey mapKey = new CounterKey(cls, grp, name);
+
+        T cntr = (T)cntrsMap.get(mapKey);
+
+        if (cntr == null) {
+            cntr = createCounter(cls, grp, name);
+
+            T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
+
+            if (old != null)
+                return old;
+        }
+
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridHadoopCounter> all() {
+        return cntrsMap.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(GridHadoopCounters other) {
+        for (GridHadoopCounter counter : other.all())
+            counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeCollection(out, cntrsMap.values());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        addCounters(U.<GridHadoopCounter>readCollection(in), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopCountersImpl counters = (HadoopCountersImpl)o;
+
+        return cntrsMap.equals(counters.cntrsMap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrsMap.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values());
+    }
+
+    /**
+     * The tuple of counter identifier components for more readable code.
+     */
+    private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Constructor.
+         *
+         * @param cls Class of the counter.
+         * @param grp Group name.
+         * @param name Counter name.
+         */
+        private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
+            super(cls, grp, name);
+        }
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public CounterKey() {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
new file mode 100644
index 0000000..1aa1e0e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.io.*;
+
+/**
+ * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
+ */
+public class HadoopLongCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The counter value. */
+    private long val;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopLongCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopLongCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        out.writeLong(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        val = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(GridHadoopCounter cntr) {
+        val += ((HadoopLongCounter)cntr).val;
+    }
+
+    /**
+     * Gets current value of this counter.
+     *
+     * @return Current value.
+     */
+    public long value() {
+        return val;
+    }
+
+    /**
+     * Sets current value by the given value.
+     *
+     * @param val Value to set.
+     */
+    public void value(long val) {
+        this.val = val;
+    }
+
+    /**
+     * Increment this counter by the given value.
+     *
+     * @param i Value to increase this counter by.
+     */
+    public void increment(long i) {
+        val += i;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
new file mode 100644
index 0000000..f22d0cd
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Counter for the job statistics accumulation.
+ */
+public class HadoopPerformanceCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The group name for this counter. */
+    private static final String GROUP_NAME = "SYSTEM";
+
+    /** The counter name for this counter. */
+    private static final String COUNTER_NAME = "PERFORMANCE";
+
+    /** Events collections. */
+    private Collection<T2<String,Long>> evts = new ArrayList<>();
+
+    /** Node id to insert into the event info. */
+    private UUID nodeId;
+
+    /** */
+    private int reducerNum;
+
+    /** */
+    private volatile Long firstShuffleMsg;
+
+    /** */
+    private volatile Long lastShuffleMsg;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopPerformanceCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopPerformanceCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /**
+     * Constructor to create instance to use this as helper.
+     *
+     * @param nodeId Id of the work node.
+     */
+    public HadoopPerformanceCounter(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        U.writeCollection(out, evts);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        try {
+            evts = U.readCollection(in);
+        }
+        catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(GridHadoopCounter cntr) {
+        evts.addAll(((HadoopPerformanceCounter)cntr).evts);
+    }
+
+    /**
+     * Gets the events collection.
+     *
+     * @return Collection of event.
+     */
+    public Collection<T2<String, Long>> evts() {
+        return evts;
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param info Task info.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(GridHadoopTaskInfo info, String evtType) {
+        return eventName(info.type().toString(), info.taskNumber(), evtType);
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param taskType Task type.
+     * @param taskNum Number of the task.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(String taskType, int taskNum, String evtType) {
+        assert nodeId != null;
+
+        return taskType + " " + taskNum + " " + evtType + " " + nodeId;
+    }
+
+    /**
+     * Adds event of the task submission (task instance creation).
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "submit"), ts));
+    }
+
+    /**
+     * Adds event of the task preparation.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "prepare"), ts));
+    }
+
+    /**
+     * Adds event of the task finish.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
+        if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
+
+            lastShuffleMsg = null;
+        }
+
+        evts.add(new T2<>(eventName(info, "finish"), ts));
+    }
+
+    /**
+     * Adds event of the task run.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskStart(GridHadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "start"), ts));
+    }
+
+    /**
+     * Adds event of the job preparation.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobPrepare(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB prepare " + nodeId, ts));
+    }
+
+    /**
+     * Adds event of the job start.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobStart(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB start " + nodeId, ts));
+    }
+
+    /**
+     * Adds client submission events from job info.
+     *
+     * @param info Job info.
+     */
+    public void clientSubmissionEvents(GridHadoopJobInfo info) {
+        assert nodeId != null;
+
+        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+    }
+
+    /**
+     * Adds event with timestamp from some property in job info.
+     *
+     * @param evt Event type and phase.
+     * @param info Job info.
+     * @param propName Property name to get timestamp.
+     */
+    private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
+        String val = info.property(propName);
+
+        if (!F.isEmpty(val)) {
+            try {
+                evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
+            }
+            catch (NumberFormatException e) {
+                throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
+            }
+        }
+    }
+
+    /**
+     * Registers shuffle message event.
+     *
+     * @param reducerNum Number of reducer that receives the data.
+     * @param ts Timestamp of the event.
+     */
+    public void onShuffleMessage(int reducerNum, long ts) {
+        this.reducerNum = reducerNum;
+
+        if (firstShuffleMsg == null)
+            firstShuffleMsg = ts;
+
+        lastShuffleMsg = ts;
+    }
+
+    /**
+     * Gets system predefined performance counter from the GridHadoopCounters object.
+     *
+     * @param cntrs GridHadoopCounters object.
+     * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
+     * @return Predefined performance counter.
+     */
+    public static HadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
+        HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+
+        if (nodeId != null)
+            cntr.nodeId(nodeId);
+
+        return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+    }
+
+    /**
+     * Sets the nodeId field.
+     *
+     * @param nodeId Node id.
+     */
+    private void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
deleted file mode 100644
index e9461e2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class GridHadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}


Mime
View raw message