ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [45/50] [abbrv] incubator-ignite git commit: # IGNITE-386: Reworked API in Hadoop module.
Date Tue, 03 Mar 2015 11:07:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java
deleted file mode 100644
index dd6b2ed..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/IgniteHadoopCounters.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite_new.hadoop.mapreduce;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.counters.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop Client API Counters adapter.
- */
-public class IgniteHadoopCounters extends Counters {
-    /** */
-    private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>();
-
-    /**
-     * Creates new instance based on given counters.
-     *
-     * @param cntrs Counters to adapt.
-     */
-    public IgniteHadoopCounters(GridHadoopCounters cntrs) {
-        for (GridHadoopCounter cntr : cntrs.all())
-            if (cntr instanceof GridHadoopLongCounter)
-                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
-        return addGroup(grp.getName(), grp.getDisplayName());
-    }
-
-    /** {@inheritDoc} */
-    @Override public CounterGroup addGroup(String name, String displayName) {
-        return new IgniteHadoopCounterGroup(this, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String grpName, String cntrName) {
-        return findCounter(grpName, cntrName, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Counter findCounter(Enum<?> key) {
-        return findCounter(key.getDeclaringClass().getName(), key.name(), true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
-        return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Iterable<String> getGroupNames() {
-        Collection<String> res = new HashSet<>();
-
-        for (GridHadoopCounter counter : cntrs.values())
-            res.add(counter.group());
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<CounterGroup> iterator() {
-        final Iterator<String> iter = getGroupNames().iterator();
-
-        return new Iterator<CounterGroup>() {
-            @Override public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override public CounterGroup next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                return new IgniteHadoopCounterGroup(IgniteHadoopCounters.this, iter.next());
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException("not implemented");
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized CounterGroup getGroup(String grpName) {
-        return new IgniteHadoopCounterGroup(this, grpName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int countCounters() {
-        return cntrs.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
-        for (CounterGroup group : other) {
-            for (Counter counter : group) {
-                findCounter(group.getName(), counter.getName()).increment(counter.getValue());
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object genericRight) {
-        if (!(genericRight instanceof IgniteHadoopCounters))
-            return false;
-
-        return cntrs.equals(((IgniteHadoopCounters) genericRight).cntrs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return cntrs.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWriteAllCounters(boolean snd) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean getWriteAllCounters() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Limits limits() {
-        return null;
-    }
-
-    /**
-     * Returns size of a group.
-     *
-     * @param grpName Name of the group.
-     * @return amount of counters in the given group.
-     */
-    public int groupSize(String grpName) {
-        int res = 0;
-
-        for (GridHadoopCounter counter : cntrs.values()) {
-            if (grpName.equals(counter.group()))
-                res++;
-        }
-
-        return res;
-    }
-
-    /**
-     * Returns counters iterator for specified group.
-     *
-     * @param grpName Name of the group to iterate.
-     * @return Counters iterator.
-     */
-    public Iterator<Counter> iterateGroup(String grpName) {
-        Collection<Counter> grpCounters = new ArrayList<>();
-
-        for (GridHadoopLongCounter counter : cntrs.values()) {
-            if (grpName.equals(counter.group()))
-                grpCounters.add(new GridHadoopV2Counter(counter));
-        }
-
-        return grpCounters.iterator();
-    }
-
-    /**
-     * Find a counter in the group.
-     *
-     * @param grpName The name of the counter group.
-     * @param cntrName The name of the counter.
-     * @param create Create the counter if not found if true.
-     * @return The counter that was found or added or {@code null} if create is false.
-     */
-    public Counter findCounter(String grpName, String cntrName, boolean create) {
-        T2<String, String> key = new T2<>(grpName, cntrName);
-
-        GridHadoopLongCounter internalCntr = cntrs.get(key);
-
-        if (internalCntr == null & create) {
-            internalCntr = new GridHadoopLongCounter(grpName,cntrName);
-
-            cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName));
-        }
-
-        return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java
deleted file mode 100644
index 7244da4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocol.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite_new.hadoop.mapreduce.protocol;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.protocol.*;
-import org.apache.hadoop.mapreduce.security.token.delegation.*;
-import org.apache.hadoop.mapreduce.v2.*;
-import org.apache.hadoop.mapreduce.v2.jobhistory.*;
-import org.apache.hadoop.security.*;
-import org.apache.hadoop.security.authorize.*;
-import org.apache.hadoop.security.token.*;
-import org.apache.ignite.*;
-import org.apache.ignite.client.hadoop.counter.*;
-import org.apache.ignite.internal.client.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Hadoop client protocol.
- */
-public class IgniteHadoopClientProtocol implements ClientProtocol {
-    /** Ignite framework name property. */
-    public static final String FRAMEWORK_NAME = "ignite";
-
-    /** Protocol version. */
-    private static final long PROTO_VER = 1L;
-
-    /** Default Ignite system directory. */
-    private static final String SYS_DIR = ".ignite/system";
-
-    /** Configuration. */
-    private final Configuration conf;
-
-    /** Ignite client. */
-    private volatile GridClient cli;
-
-    /** Last received version. */
-    private long lastVer = -1;
-
-    /** Last received status. */
-    private GridHadoopJobStatus lastStatus;
-
-    /**
-     * Constructor.
-     *
-     * @param conf Configuration.
-     * @param cli Ignite client.
-     */
-    IgniteHadoopClientProtocol(Configuration conf, GridClient cli) {
-        assert cli != null;
-
-        this.conf = conf;
-        this.cli = cli;
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobID getNewJobID() throws IOException, InterruptedException {
-        try {
-            conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
-
-            GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null);
-
-            conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
-
-            return new JobID(jobID.globalId().toString(), jobID.localId());
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get new job ID.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
-        InterruptedException {
-        try {
-            conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
-
-            GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(),
-                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
-
-            if (status == null)
-                throw new IOException("Failed to submit job (null status obtained): " + jobId);
-
-            return processStatus(status);
-        }
-        catch (GridClientException | IgniteCheckedException e) {
-            throw new IOException("Failed to submit job.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
-        return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
-        return Cluster.JobTrackerStatus.RUNNING;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
-        return new AccessControlList("*");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
-        try {
-            cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(),
-                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to kill job: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
-        InterruptedException {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
-        try {
-            Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
-
-            GridHadoopProtocolTaskArguments args = delay >= 0 ?
-                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
-                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
-
-            GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args);
-
-            if (status == null)
-                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
-
-            return processStatus(status);
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get job status: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
-        try {
-            final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(),
-                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
-
-            if (counters == null)
-                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
-
-            return new GridHadoopClientCounters(counters);
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get job counters: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
-        return new TaskReport[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getFilesystemName() throws IOException, InterruptedException {
-        return FileSystem.get(conf).getUri().toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-        return new JobStatus[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
-        throws IOException, InterruptedException {
-        return new TaskCompletionEvent[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
-        return new String[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
-        return new TaskTrackerInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
-        return new TaskTrackerInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getSystemDir() throws IOException, InterruptedException {
-        Path sysDir = new Path(SYS_DIR);
-
-        return sysDir.toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getStagingAreaDir() throws IOException, InterruptedException {
-        String usr = UserGroupInformation.getCurrentUser().getShortUserName();
-
-        return GridHadoopUtils.stagingAreaDir(conf, usr).toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getJobHistoryDir() throws IOException, InterruptedException {
-        return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
-        return new QueueAclsInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
-        InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
-        InterruptedException {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
-        InterruptedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
-        InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-        return PROTO_VER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
-        throws IOException {
-        return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
-    }
-
-    /**
-     * Process received status update.
-     *
-     * @param status Ignite status.
-     * @return Hadoop status.
-     */
-    private JobStatus processStatus(GridHadoopJobStatus status) {
-        // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
-        // GridHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
-        // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
-        // change in future and either protocol will serve statuses for several jobs or status update will not be
-        // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
-        // (vozerov)
-        if (lastVer < status.version()) {
-            lastVer = status.version();
-
-            lastStatus = status;
-        }
-        else
-            assert lastStatus != null;
-
-        return GridHadoopUtils.status(lastStatus, conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java
deleted file mode 100644
index d2fe28e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/mapreduce/protocol/IgniteHadoopClientProtocolProvider.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite_new.hadoop.mapreduce.protocol;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.protocol.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.client.*;
-import org.apache.ignite.internal.client.marshaller.optimized.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.client.hadoop.GridHadoopClientProtocol.*;
-import static org.apache.ignite.internal.client.GridClientProtocol.*;
-
-
-/**
- * Grid Hadoop client protocol provider.
- */
-public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
-    /** Clients. */
-    private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
-            String addr = conf.get(MRConfig.MASTER_ADDRESS);
-
-            if (F.isEmpty(addr))
-                throw new IOException("Failed to create client protocol because server address is not specified (is " +
-                    MRConfig.MASTER_ADDRESS + " property set?).");
-
-            if (F.eq(addr, "local"))
-                throw new IOException("Local execution mode is not supported, please point " +
-                    MRConfig.MASTER_ADDRESS + " to real Ignite node.");
-
-            return createProtocol(addr, conf);
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
-            return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(ClientProtocol cliProto) throws IOException {
-        // No-op.
-    }
-
-    /**
-     * Internal protocol creation routine.
-     *
-     * @param addr Address.
-     * @param conf Configuration.
-     * @return Client protocol.
-     * @throws java.io.IOException If failed.
-     */
-    private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
-        return new IgniteHadoopClientProtocol(conf, client(addr));
-    }
-
-    /**
-     * Create client.
-     *
-     * @param addr Endpoint address.
-     * @return Client.
-     * @throws java.io.IOException If failed.
-     */
-    private static GridClient client(String addr) throws IOException {
-        try {
-            IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
-
-            if (fut == null) {
-                GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0);
-
-                if (oldFut != null)
-                    return oldFut.get();
-                else {
-                    GridClientConfiguration cliCfg = new GridClientConfiguration();
-
-                    cliCfg.setProtocol(TCP);
-                    cliCfg.setServers(Collections.singletonList(addr));
-                    cliCfg.setMarshaller(new GridClientOptimizedMarshaller());
-                    cliCfg.setDaemon(true);
-
-                    try {
-                        GridClient cli = GridClientFactory.start(cliCfg);
-
-                        fut0.onDone(cli);
-
-                        return cli;
-                    }
-                    catch (GridClientException e) {
-                        fut0.onDone(e);
-
-                        throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-                    }
-                }
-            }
-            else
-                return fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
index fe35d5e..8d5957b 100644
--- a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
+++ b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -1 +1 @@
-org.apache.ignite.client.hadoop.GridHadoopClientProtocolProvider
+org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
deleted file mode 100644
index 780ce67..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop client protocol tests in embedded process mode.
- */
-public class GridHadoopClientProtocolEmbeddedSelfTest extends GridHadoopClientProtocolSelfTest {
-    /** {@inheritDoc} */
-    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
-        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
-
-        cfg.setExternalExecution(false);
-
-        return cfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
deleted file mode 100644
index ff8798b..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.hadoop.mapreduce.protocol.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop client protocol tests in external process mode.
- */
-@SuppressWarnings("ResultOfMethodCallIgnored")
-public class GridHadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest {
-    /** Input path. */
-    private static final String PATH_INPUT = "/input";
-
-    /** Output path. */
-    private static final String PATH_OUTPUT = "/output";
-
-    /** Job name. */
-    private static final String JOB_NAME = "myJob";
-
-    /** Setup lock file. */
-    private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
-        "ignite-lock-setup.file");
-
-    /** Map lock file. */
-    private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
-        "ignite-lock-map.file");
-
-    /** Reduce lock file. */
-    private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
-        "ignite-lock-reduce.file");
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean igfsEnabled() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean restEnabled() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGrids(gridCount());
-
-        setupLockFile.delete();
-        mapLockFile.delete();
-        reduceLockFile.delete();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-
-//        GridHadoopClientProtocolProvider.cliMap.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        setupLockFile.createNewFile();
-        mapLockFile.createNewFile();
-        reduceLockFile.createNewFile();
-
-        setupLockFile.deleteOnExit();
-        mapLockFile.deleteOnExit();
-        reduceLockFile.deleteOnExit();
-
-        super.beforeTest();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName).format();
-
-        setupLockFile.delete();
-        mapLockFile.delete();
-        reduceLockFile.delete();
-
-        super.afterTest();
-    }
-
-    /**
-     * Test next job ID generation.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void tstNextJobId() throws Exception {
-        GridHadoopClientProtocolProvider provider = provider();
-
-        ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
-
-        JobID jobId = proto.getNewJobID();
-
-        assert jobId != null;
-        assert jobId.getJtIdentifier() != null;
-
-        JobID nextJobId = proto.getNewJobID();
-
-        assert nextJobId != null;
-        assert nextJobId.getJtIdentifier() != null;
-
-        assert !F.eq(jobId, nextJobId);
-    }
-
-    /**
-     * Tests job counters retrieval.
-     *
-     * @throws Exception If failed.
-     */
-    public void testJobCounters() throws Exception {
-        IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
-
-        igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
-        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
-            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
-            bw.write(
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n" +
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n" +
-                "alpha\n" +
-                "beta\n" +
-                "gamma\n"
-            );
-        }
-
-        Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
-
-        final Job job = Job.getInstance(conf);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setMapperClass(TestCountingMapper.class);
-        job.setReducerClass(TestCountingReducer.class);
-        job.setCombinerClass(TestCountingCombiner.class);
-
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
-        job.submit();
-
-        final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
-
-        assertEquals(0, cntr.getValue());
-
-        cntr.increment(10);
-
-        assertEquals(10, cntr.getValue());
-
-        // Transferring to map phase.
-        setupLockFile.delete();
-
-        // Transferring to reduce phase.
-        mapLockFile.delete();
-
-        job.waitForCompletion(false);
-
-        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
-
-        final Counters counters = job.getCounters();
-
-        assertNotNull("counters cannot be null", counters);
-        assertEquals("wrong counters count", 3, counters.countCounters());
-        assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
-        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
-    }
-
-    /**
-     * Tests job counters retrieval for unknown job id.
-     *
-     * @throws Exception If failed.
-     */
-    private void tstUnknownJobCounters() throws Exception {
-        GridHadoopClientProtocolProvider provider = provider();
-
-        ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
-
-        try {
-            proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
-            fail("exception must be thrown");
-        }
-        catch (Exception e) {
-            assert e instanceof IOException : "wrong error has been thrown";
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMap() throws Exception {
-        checkJobSubmit(true, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapCombine() throws Exception {
-        checkJobSubmit(false, true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapReduce() throws Exception {
-        checkJobSubmit(true, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void tstJobSubmitMapCombineReduce() throws Exception {
-        checkJobSubmit(false, false);
-    }
-
-    /**
-     * Test job submission.
-     *
-     * @param noCombiners Whether there are no combiners.
-     * @param noReducers Whether there are no reducers.
-     * @throws Exception If failed.
-     */
-    public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
-        IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
-
-        igfs.mkdirs(new IgfsPath(PATH_INPUT));
-
-        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
-            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
-
-            bw.write("word");
-        }
-
-        Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
-
-        final Job job = Job.getInstance(conf);
-
-        job.setJobName(JOB_NAME);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        job.setMapperClass(TestMapper.class);
-        job.setReducerClass(TestReducer.class);
-
-        if (!noCombiners)
-            job.setCombinerClass(TestCombiner.class);
-
-        if (noReducers)
-            job.setNumReduceTasks(0);
-
-        job.setInputFormatClass(TextInputFormat.class);
-        job.setOutputFormatClass(TestOutputFormat.class);
-
-        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
-        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
-
-        job.submit();
-
-        JobID jobId = job.getJobID();
-
-        // Setup phase.
-        JobStatus jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
-        assert jobStatus.getMapProgress() == 0.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
-
-        U.sleep(2100);
-
-        JobStatus recentJobStatus = job.getStatus();
-
-        assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
-            "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
-
-        // Transferring to map phase.
-        setupLockFile.delete();
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getSetupProgress());
-                }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
-                }
-            }
-        }, 5000L);
-
-        // Map phase.
-        jobStatus = job.getStatus();
-        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
-        assert jobStatus.getReduceProgress() == 0.0f;
-
-        U.sleep(2100);
-
-        recentJobStatus = job.getStatus();
-
-        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
-            "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
-
-        // Transferring to reduce phase.
-        mapLockFile.delete();
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    return F.eq(1.0f, job.getStatus().getMapProgress());
-                }
-                catch (Exception e) {
-                    throw new RuntimeException("Unexpected exception.", e);
-                }
-            }
-        }, 5000L);
-
-        if (!noReducers) {
-            // Reduce phase.
-            jobStatus = job.getStatus();
-            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
-            assert jobStatus.getSetupProgress() == 1.0f;
-            assert jobStatus.getMapProgress() == 1.0f;
-            assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
-
-            // Ensure that reduces progress increases.
-            U.sleep(2100);
-
-            recentJobStatus = job.getStatus();
-
-            assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
-                "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
-
-            reduceLockFile.delete();
-        }
-
-        job.waitForCompletion(false);
-
-        jobStatus = job.getStatus();
-        checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
-        assert jobStatus.getSetupProgress() == 1.0f;
-        assert jobStatus.getMapProgress() == 1.0f;
-        assert jobStatus.getReduceProgress() == 1.0f;
-
-        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
-    }
-
-    /**
-     * Dump IGFS content.
-     *
-     * @param igfs IGFS.
-     * @param path Path.
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private static void dumpIgfs(IgniteFs igfs, IgfsPath path) throws Exception {
-        IgfsFile file = igfs.info(path);
-
-        assert file != null;
-
-        System.out.println(file.path());
-
-        if (file.isDirectory()) {
-            for (IgfsPath child : igfs.listPaths(path))
-                dumpIgfs(igfs, child);
-        }
-        else {
-            try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
-                String line = br.readLine();
-
-                while (line != null) {
-                    System.out.println(line);
-
-                    line = br.readLine();
-                }
-            }
-        }
-    }
-
-    /**
-     * Check job status.
-     *
-     * @param status Job status.
-     * @param expJobId Expected job ID.
-     * @param expJobName Expected job name.
-     * @param expState Expected state.
-     * @param expCleanupProgress Expected cleanup progress.
-     * @throws Exception If failed.
-     */
-    private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
-        JobStatus.State expState, float expCleanupProgress) throws Exception {
-        assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
-        assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
-        assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
-        assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
-            "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
-    }
-
-    /**
-     * @return Configuration.
-     */
-    private Configuration config(int port) {
-        Configuration conf = new Configuration();
-
-        setupFileSystems(conf);
-
-        conf.set(MRConfig.FRAMEWORK_NAME, GridHadoopClientProtocol.FRAMEWORK_NAME);
-        conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
-
-        conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
-
-        return conf;
-    }
-
-    /**
-     * @return Protocol provider.
-     */
-    private GridHadoopClientProtocolProvider provider() {
-        return new GridHadoopClientProtocolProvider();
-    }
-
-    /**
-     * Test mapper.
-     */
-    public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
-        /** Writable container for writing word. */
-        private Text word = new Text();
-
-        /** Writable integer constant of '1' is writing as count of found words. */
-        private static final IntWritable one = new IntWritable(1);
-
-        /** {@inheritDoc} */
-        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
-            while (mapLockFile.exists())
-                Thread.sleep(50);
-
-            StringTokenizer wordList = new StringTokenizer(val.toString());
-
-            while (wordList.hasMoreTokens()) {
-                word.set(wordList.nextToken());
-
-                ctx.write(word, one);
-            }
-        }
-    }
-
-    /**
-     * Test Hadoop counters.
-     */
-    public enum TestCounter {
-        COUNTER1, COUNTER2, COUNTER3
-    }
-
-    /**
-     * Test mapper that uses counters.
-     */
-    public static class TestCountingMapper extends TestMapper {
-        /** {@inheritDoc} */
-        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
-            super.map(key, val, ctx);
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-        }
-    }
-
-    /**
-     * Test combiner that counts invocations.
-     */
-    public static class TestCountingCombiner extends TestReducer {
-        @Override public void reduce(Text key, Iterable<IntWritable> values,
-            Context ctx) throws IOException, InterruptedException {
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-            ctx.getCounter(TestCounter.COUNTER2).increment(1);
-
-            int sum = 0;
-            for (IntWritable value : values) {
-                sum += value.get();
-            }
-
-            ctx.write(key, new IntWritable(sum));
-        }
-    }
-
-    /**
-     * Test reducer that counts invocations.
-     */
-    public static class TestCountingReducer extends TestReducer {
-        @Override public void reduce(Text key, Iterable<IntWritable> values,
-            Context ctx) throws IOException, InterruptedException {
-            ctx.getCounter(TestCounter.COUNTER1).increment(1);
-            ctx.getCounter(TestCounter.COUNTER3).increment(1);
-        }
-    }
-
-    /**
-     * Test combiner.
-     */
-    public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
-        // No-op.
-    }
-
-    public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
-        /** {@inheritDoc} */
-        @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
-            throws IOException {
-            return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
-        }
-    }
-
-    /**
-     * Test output committer.
-     */
-    private static class TestOutputCommitter extends FileOutputCommitter {
-        /** Delegate. */
-        private final FileOutputCommitter delegate;
-
-        /**
-         * Constructor.
-         *
-         * @param ctx Task attempt context.
-         * @param delegate Delegate.
-         * @throws IOException If failed.
-         */
-        private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
-            super(FileOutputFormat.getOutputPath(ctx), ctx);
-
-            this.delegate = delegate;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setupJob(JobContext jobCtx) throws IOException {
-            try {
-                while (setupLockFile.exists())
-                    Thread.sleep(50);
-            }
-            catch (InterruptedException ignored) {
-                throw new IOException("Interrupted.");
-            }
-
-            delegate.setupJob(jobCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.setupTask(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
-            return delegate.needsTaskCommit(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.commitTask(taskCtx);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
-            delegate.abortTask(taskCtx);
-        }
-    }
-
-    /**
-     * Test reducer.
-     */
-    public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
-        /** Writable container for writing sum of word counts. */
-        private IntWritable totalWordCnt = new IntWritable();
-
-        /** {@inheritDoc} */
-        @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
-            InterruptedException {
-            while (reduceLockFile.exists())
-                Thread.sleep(50);
-
-            int wordCnt = 0;
-
-            for (IntWritable value : values)
-                wordCnt += value.get();
-
-            totalWordCnt.set(wordCnt);
-
-            ctx.write(key, totalWordCnt);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
new file mode 100644
index 0000000..667fec9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Hadoop client protocol tests in embedded process mode.
+ */
+public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest {
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
new file mode 100644
index 0000000..f3f22fc
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.hadoop.mapreduce.protocol.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.mapreduce.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.proto.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop client protocol tests in external process mode.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest {
+    /** Input path. */
+    private static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    private static final String PATH_OUTPUT = "/output";
+
+    /** Job name. */
+    private static final String JOB_NAME = "myJob";
+
+    /** Setup lock file. */
+    private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-setup.file");
+
+    /** Map lock file. */
+    private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-map.file");
+
+    /** Reduce lock file. */
+    private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-reduce.file");
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean restEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+
+//        IgniteHadoopClientProtocolProvider.cliMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        setupLockFile.createNewFile();
+        mapLockFile.createNewFile();
+        reduceLockFile.createNewFile();
+
+        setupLockFile.deleteOnExit();
+        mapLockFile.deleteOnExit();
+        reduceLockFile.deleteOnExit();
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName).format();
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test next job ID generation.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void tstNextJobId() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
+
+        JobID jobId = proto.getNewJobID();
+
+        assert jobId != null;
+        assert jobId.getJtIdentifier() != null;
+
+        JobID nextJobId = proto.getNewJobID();
+
+        assert nextJobId != null;
+        assert nextJobId.getJtIdentifier() != null;
+
+        assert !F.eq(jobId, nextJobId);
+    }
+
+    /**
+     * Tests job counters retrieval.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJobCounters() throws Exception {
+        IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write(
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n"
+            );
+        }
+
+        Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestCountingMapper.class);
+        job.setReducerClass(TestCountingReducer.class);
+        job.setCombinerClass(TestCountingCombiner.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
+
+        assertEquals(0, cntr.getValue());
+
+        cntr.increment(10);
+
+        assertEquals(10, cntr.getValue());
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        job.waitForCompletion(false);
+
+        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
+
+        final Counters counters = job.getCounters();
+
+        assertNotNull("counters cannot be null", counters);
+        assertEquals("wrong counters count", 3, counters.countCounters());
+        assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
+        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
+        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+    }
+
+    /**
+     * Tests job counters retrieval for unknown job id.
+     *
+     * @throws Exception If failed.
+     */
+    private void tstUnknownJobCounters() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT));
+
+        try {
+            proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
+            fail("exception must be thrown");
+        }
+        catch (Exception e) {
+            assert e instanceof IOException : "wrong error has been thrown";
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMap() throws Exception {
+        checkJobSubmit(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombine() throws Exception {
+        checkJobSubmit(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapReduce() throws Exception {
+        checkJobSubmit(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombineReduce() throws Exception {
+        checkJobSubmit(false, false);
+    }
+
+    /**
+     * Test job submission.
+     *
+     * @param noCombiners Whether there are no combiners.
+     * @param noReducers Whether there are no reducers.
+     * @throws Exception If failed.
+     */
+    public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
+        IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write("word");
+        }
+
+        Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setJobName(JOB_NAME);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setReducerClass(TestReducer.class);
+
+        if (!noCombiners)
+            job.setCombinerClass(TestCombiner.class);
+
+        if (noReducers)
+            job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(TestOutputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        JobID jobId = job.getJobID();
+
+        // Setup phase.
+        JobStatus jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+        assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
+        assert jobStatus.getMapProgress() == 0.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        JobStatus recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
+            "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getSetupProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        // Map phase.
+        jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
+            "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getMapProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        if (!noReducers) {
+            // Reduce phase.
+            jobStatus = job.getStatus();
+            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+            assert jobStatus.getSetupProgress() == 1.0f;
+            assert jobStatus.getMapProgress() == 1.0f;
+            assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
+
+            // Ensure that reduces progress increases.
+            U.sleep(2100);
+
+            recentJobStatus = job.getStatus();
+
+            assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
+                "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
+
+            reduceLockFile.delete();
+        }
+
+        job.waitForCompletion(false);
+
+        jobStatus = job.getStatus();
+        checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() == 1.0f;
+        assert jobStatus.getReduceProgress() == 1.0f;
+
+        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+    }
+
+    /**
+     * Dump IGFS content.
+     *
+     * @param igfs IGFS.
+     * @param path Path.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static void dumpIgfs(IgniteFs igfs, IgfsPath path) throws Exception {
+        IgfsFile file = igfs.info(path);
+
+        assert file != null;
+
+        System.out.println(file.path());
+
+        if (file.isDirectory()) {
+            for (IgfsPath child : igfs.listPaths(path))
+                dumpIgfs(igfs, child);
+        }
+        else {
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
+                String line = br.readLine();
+
+                while (line != null) {
+                    System.out.println(line);
+
+                    line = br.readLine();
+                }
+            }
+        }
+    }
+
+    /**
+     * Check job status.
+     *
+     * @param status Job status.
+     * @param expJobId Expected job ID.
+     * @param expJobName Expected job name.
+     * @param expState Expected state.
+     * @param expCleanupProgress Expected cleanup progress.
+     * @throws Exception If failed.
+     */
+    private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
+        JobStatus.State expState, float expCleanupProgress) throws Exception {
+        assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
+        assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
+        assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
+        assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
+            "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
+    }
+
+    /**
+     * @return Configuration.
+     */
+    private Configuration config(int port) {
+        Configuration conf = new Configuration();
+
+        setupFileSystems(conf);
+
+        conf.set(MRConfig.FRAMEWORK_NAME, HadoopClientProtocol.FRAMEWORK_NAME);
+        conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
+
+        conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
+
+        return conf;
+    }
+
+    /**
+     * @return Protocol provider.
+     */
+    private IgniteHadoopClientProtocolProvider provider() {
+        return new IgniteHadoopClientProtocolProvider();
+    }
+
+    /**
+     * Test mapper.
+     */
+    public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** Writable container for writing word. */
+        private Text word = new Text();
+
+        /** Writable integer constant of '1' is writing as count of found words. */
+        private static final IntWritable one = new IntWritable(1);
+
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            while (mapLockFile.exists())
+                Thread.sleep(50);
+
+            StringTokenizer wordList = new StringTokenizer(val.toString());
+
+            while (wordList.hasMoreTokens()) {
+                word.set(wordList.nextToken());
+
+                ctx.write(word, one);
+            }
+        }
+    }
+
+    /**
+     * Test Hadoop counters.
+     */
+    public enum TestCounter {
+        COUNTER1, COUNTER2, COUNTER3
+    }
+
+    /**
+     * Test mapper that uses counters.
+     */
+    public static class TestCountingMapper extends TestMapper {
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            super.map(key, val, ctx);
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner that counts invocations.
+     */
+    public static class TestCountingCombiner extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER2).increment(1);
+
+            int sum = 0;
+            for (IntWritable value : values) {
+                sum += value.get();
+            }
+
+            ctx.write(key, new IntWritable(sum));
+        }
+    }
+
+    /**
+     * Test reducer that counts invocations.
+     */
+    public static class TestCountingReducer extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER3).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+        // No-op.
+    }
+
+    public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
+        /** {@inheritDoc} */
+        @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
+            throws IOException {
+            return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
+        }
+    }
+
+    /**
+     * Test output committer.
+     */
+    private static class TestOutputCommitter extends FileOutputCommitter {
+        /** Delegate. */
+        private final FileOutputCommitter delegate;
+
+        /**
+         * Constructor.
+         *
+         * @param ctx Task attempt context.
+         * @param delegate Delegate.
+         * @throws IOException If failed.
+         */
+        private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
+            super(FileOutputFormat.getOutputPath(ctx), ctx);
+
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupJob(JobContext jobCtx) throws IOException {
+            try {
+                while (setupLockFile.exists())
+                    Thread.sleep(50);
+            }
+            catch (InterruptedException ignored) {
+                throw new IOException("Interrupted.");
+            }
+
+            delegate.setupJob(jobCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.setupTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
+            return delegate.needsTaskCommit(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.commitTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.abortTask(taskCtx);
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** Writable container for writing sum of word counts. */
+        private IntWritable totalWordCnt = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            while (reduceLockFile.exists())
+                Thread.sleep(50);
+
+            int wordCnt = 0;
+
+            for (IntWritable value : values)
+                wordCnt += value.get();
+
+            totalWordCnt.set(wordCnt);
+
+            ctx.write(key, totalWordCnt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index 29696bf..5fad86f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -20,7 +20,7 @@ package org.apache.ignite.igfs;
 import junit.framework.*;
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.hadoop.*;
+import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -116,7 +116,7 @@ public class IgfsEventsTestSuite extends TestSuite {
         @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException {
             IgfsConfiguration igfsCfg = super.getIgfsConfiguration();
 
-            igfsCfg.setSecondaryFileSystem(new IgfsHadoopFileSystemWrapper(
+            igfsCfg.setSecondaryFileSystem(new IgniteHadoopSecondaryFileSystem(
                 "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
                 "modules/core/src/test/config/hadoop/core-site-secondary.xml"));
 
@@ -215,7 +215,7 @@ public class IgfsEventsTestSuite extends TestSuite {
         @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException {
             IgfsConfiguration igfsCfg = super.getIgfsConfiguration();
 
-            igfsCfg.setSecondaryFileSystem(new IgfsHadoopFileSystemWrapper(
+            igfsCfg.setSecondaryFileSystem(new IgniteHadoopSecondaryFileSystem(
                 "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/",
                 "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
index 9f9a6d8..04f776e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.hadoop.*;
+import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -262,7 +262,7 @@ public abstract class IgfsHadoop20FileSystemAbstractSelfTest extends IgfsCommonA
         cfg.setDefaultMode(mode);
 
         if (mode != PRIMARY)
-            cfg.setSecondaryFileSystem(new IgfsHadoopFileSystemWrapper(secondaryFileSystemUriPath(),
+            cfg.setSecondaryFileSystem(new IgniteHadoopSecondaryFileSystem(secondaryFileSystemUriPath(),
                 secondaryFileSystemConfigPath()));
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
index a54e264..2d29800 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.hadoop.*;
+import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -40,7 +40,7 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
-import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.*;
 
 /**
@@ -183,7 +183,7 @@ public abstract class IgfsHadoopDualAbstractSelfTest extends IgfsCommonAbstractT
 
         Ignite igniteSecondary = startGridWithIgfs("grid-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG);
 
-        Igfs hadoopFs = new IgfsHadoopFileSystemWrapper(SECONDARY_URI, SECONDARY_CFG);
+        Igfs hadoopFs = new IgniteHadoopSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
 
         Ignite ignite = startGridWithIgfs("grid", "igfs", mode, hadoopFs, PRIMARY_REST_CFG);
 


Mime
View raw message