ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [44/46] ignite git commit: IGNITE-3953: Hadoop: Merged back both modules.
Date Mon, 26 Sep 2016 07:17:40 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
deleted file mode 100644
index d8e70d1..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation.
- */
-package org.apache.ignite.hadoop.fs.v2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
deleted file mode 100644
index c806cb1..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientFactory;
-import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
-import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-
-import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
-
-
-/**
- * Ignite Hadoop client protocol provider.
- */
-public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
-    /** Framework name used in configuration. */
-    public static final String FRAMEWORK_NAME = "ignite";
-
-    /** Clients. */
-    private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
-            String addr = conf.get(MRConfig.MASTER_ADDRESS);
-
-            if (F.isEmpty(addr))
-                throw new IOException("Failed to create client protocol because server address is not specified (is " +
-                    MRConfig.MASTER_ADDRESS + " property set?).");
-
-            if (F.eq(addr, "local"))
-                throw new IOException("Local execution mode is not supported, please point " +
-                    MRConfig.MASTER_ADDRESS + " to real Ignite node.");
-
-            return createProtocol(addr, conf);
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
-            return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(ClientProtocol cliProto) throws IOException {
-        // No-op.
-    }
-
-    /**
-     * Internal protocol creation routine.
-     *
-     * @param addr Address.
-     * @param conf Configuration.
-     * @return Client protocol.
-     * @throws IOException If failed.
-     */
-    private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
-        return new HadoopClientProtocol(conf, client(addr));
-    }
-
-    /**
-     * Create client.
-     *
-     * @param addr Endpoint address.
-     * @return Client.
-     * @throws IOException If failed.
-     */
-    private static GridClient client(String addr) throws IOException {
-        try {
-            IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
-
-            if (fut == null) {
-                GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0);
-
-                if (oldFut != null)
-                    return oldFut.get();
-                else {
-                    GridClientConfiguration cliCfg = new GridClientConfiguration();
-
-                    cliCfg.setProtocol(TCP);
-                    cliCfg.setServers(Collections.singletonList(addr));
-                    cliCfg.setMarshaller(new GridClientJdkMarshaller());
-                    cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
-                    cliCfg.setDaemon(true);
-
-                    try {
-                        GridClient cli = GridClientFactory.start(cliCfg);
-
-                        fut0.onDone(cli);
-
-                        return cli;
-                    }
-                    catch (GridClientException e) {
-                        fut0.onDone(e);
-
-                        throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-                    }
-                }
-            }
-            else
-                return fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
deleted file mode 100644
index 7635b9e..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Ignite Hadoop Accelerator map-reduce classes.
- */
-package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
deleted file mode 100644
index 9181623..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
-
-/**
- * Hadoop +counter group adapter.
- */
-class HadoopMapReduceCounterGroup implements CounterGroup {
-    /** Counters. */
-    private final HadoopMapReduceCounters cntrs;
-
-    /** Group name. */
-    private final String name;
-
-    /**
-     * Creates new instance.
-     *
-     * @param cntrs Client counters instance.
-     * @param name Group name.
-     */
-    HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
-        this.cntrs = cntrs;
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getDisplayName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDisplayName(String displayName) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addCounter(Counter counter) {
-        addCounter(counter.getName(), counter.getDisplayName(), 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter addCounter(String name, String displayName, long value) {
-        final Counter counter = cntrs.findCounter(this.name, name);
-
-        counter.setValue(value);
-
-        return counter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName, String displayName) {
-        return cntrs.findCounter(name, counterName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName, boolean create) {
-        return cntrs.findCounter(name, counterName, create);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName) {
-        return cntrs.findCounter(name, counterName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return cntrs.groupSize(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
-        for (final Counter counter : rightGroup)
-            cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
-    }
-
-    /** {@inheritDoc} */
-    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<Counter> iterator() {
-        return cntrs.iterateGroup(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
deleted file mode 100644
index 924312e..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.FileSystemCounter;
-import org.apache.hadoop.mapreduce.counters.AbstractCounters;
-import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Counter;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- * Hadoop counters adapter.
- */
-public class HadoopMapReduceCounters extends Counters {
-    /** */
-    private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
-
-    /**
-     * Creates new instance based on given counters.
-     *
-     * @param cntrs Counters to adapt.
-     */
-    public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
-        for (HadoopCounter cntr : cntrs.all())
-            if (cntr instanceof HadoopLongCounter)
-                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
-        return addGroup(grp.getName(), grp.getDisplayName());
-    }
-
-    /** {@inheritDoc} */
-    @Override public CounterGroup addGroup(String name, String displayName) {
-        return new HadoopMapReduceCounterGroup(this, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String grpName, String cntrName) {
-        return findCounter(grpName, cntrName, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Counter findCounter(Enum<?> key) {
-        return findCounter(key.getDeclaringClass().getName(), key.name(), true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
-        return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized Iterable<String> getGroupNames() {
-        Collection<String> res = new HashSet<>();
-
-        for (HadoopCounter counter : cntrs.values())
-            res.add(counter.group());
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<CounterGroup> iterator() {
-        final Iterator<String> iter = getGroupNames().iterator();
-
-        return new Iterator<CounterGroup>() {
-            @Override public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override public CounterGroup next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException("not implemented");
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized CounterGroup getGroup(String grpName) {
-        return new HadoopMapReduceCounterGroup(this, grpName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int countCounters() {
-        return cntrs.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
-        for (CounterGroup group : other) {
-            for (Counter counter : group) {
-                findCounter(group.getName(), counter.getName()).increment(counter.getValue());
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object genericRight) {
-        if (!(genericRight instanceof HadoopMapReduceCounters))
-            return false;
-
-        return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return cntrs.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWriteAllCounters(boolean snd) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean getWriteAllCounters() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Limits limits() {
-        return null;
-    }
-
-    /**
-     * Returns size of a group.
-     *
-     * @param grpName Name of the group.
-     * @return amount of counters in the given group.
-     */
-    public int groupSize(String grpName) {
-        int res = 0;
-
-        for (HadoopCounter counter : cntrs.values()) {
-            if (grpName.equals(counter.group()))
-                res++;
-        }
-
-        return res;
-    }
-
-    /**
-     * Returns counters iterator for specified group.
-     *
-     * @param grpName Name of the group to iterate.
-     * @return Counters iterator.
-     */
-    public Iterator<Counter> iterateGroup(String grpName) {
-        Collection<Counter> grpCounters = new ArrayList<>();
-
-        for (HadoopLongCounter counter : cntrs.values()) {
-            if (grpName.equals(counter.group()))
-                grpCounters.add(new HadoopV2Counter(counter));
-        }
-
-        return grpCounters.iterator();
-    }
-
-    /**
-     * Find a counter in the group.
-     *
-     * @param grpName The name of the counter group.
-     * @param cntrName The name of the counter.
-     * @param create Create the counter if not found if true.
-     * @return The counter that was found or added or {@code null} if create is false.
-     */
-    public Counter findCounter(String grpName, String cntrName, boolean create) {
-        T2<String, String> key = new T2<>(grpName, cntrName);
-
-        HadoopLongCounter internalCntr = cntrs.get(key);
-
-        if (internalCntr == null & create) {
-            internalCntr = new HadoopLongCounter(grpName,cntrName);
-
-            cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
-        }
-
-        return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
deleted file mode 100644
index 2a92383..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.PrintStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Hadoop utility methods.
- */
-public class HadoopUtils {
-    /** Staging constant. */
-    private static final String STAGING_CONSTANT = ".staging";
-
-    /** Old mapper class attribute. */
-    private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
-
-    /** Old reducer class attribute. */
-    private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
-
-    /**
-     * Constructor.
-     */
-    private HadoopUtils() {
-        // No-op.
-    }
-
-    /**
-     * Wraps native split.
-     *
-     * @param id Split ID.
-     * @param split Split.
-     * @param hosts Hosts.
-     * @throws IOException If failed.
-     */
-    public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
-        ByteArrayOutputStream arr = new ByteArrayOutputStream();
-        ObjectOutput out = new ObjectOutputStream(arr);
-
-        assert split instanceof Writable;
-
-        ((Writable)split).write(out);
-
-        out.flush();
-
-        return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
-    }
-
-    /**
-     * Unwraps native split.
-     *
-     * @param o Wrapper.
-     * @return Split.
-     */
-    public static Object unwrapSplit(HadoopSplitWrapper o) {
-        try {
-            Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
-
-            w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
-
-            return w;
-        }
-        catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    /**
-     * Convert Ignite job status to Hadoop job status.
-     *
-     * @param status Ignite job status.
-     * @return Hadoop job status.
-     */
-    public static JobStatus status(HadoopJobStatus status, Configuration conf) {
-        JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
-
-        float setupProgress = 0;
-        float mapProgress = 0;
-        float reduceProgress = 0;
-        float cleanupProgress = 0;
-
-        JobStatus.State state = JobStatus.State.RUNNING;
-
-        switch (status.jobPhase()) {
-            case PHASE_SETUP:
-                setupProgress = 0.42f;
-
-                break;
-
-            case PHASE_MAP:
-                setupProgress = 1;
-                mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
-
-                break;
-
-            case PHASE_REDUCE:
-                setupProgress = 1;
-                mapProgress = 1;
-
-                if (status.totalReducerCnt() > 0)
-                    reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
-                else
-                    reduceProgress = 1f;
-
-                break;
-
-            case PHASE_CANCELLING:
-            case PHASE_COMPLETE:
-                if (!status.isFailed()) {
-                    setupProgress = 1;
-                    mapProgress = 1;
-                    reduceProgress = 1;
-                    cleanupProgress = 1;
-
-                    state = JobStatus.State.SUCCEEDED;
-                }
-                else
-                    state = JobStatus.State.FAILED;
-
-                break;
-
-            default:
-                assert false;
-        }
-
-        return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
-            JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
-    }
-
-    /**
-     * Gets staging area directory.
-     *
-     * @param conf Configuration.
-     * @param usr User.
-     * @return Staging area directory.
-     */
-    public static Path stagingAreaDir(Configuration conf, String usr) {
-        return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
-            + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
-    }
-
-    /**
-     * Gets job file.
-     *
-     * @param conf Configuration.
-     * @param usr User.
-     * @param jobId Job ID.
-     * @return Job file.
-     */
-    public static Path jobFile(Configuration conf, String usr, JobID jobId) {
-        return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
-    }
-
-    /**
-     * Checks the attribute in configuration is not set.
-     *
-     * @param attr Attribute name.
-     * @param msg Message for creation of exception.
-     * @throws IgniteCheckedException If attribute is set.
-     */
-    public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
-        if (cfg.get(attr) != null)
-            throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
-    }
-
-    /**
-     * Creates JobInfo from hadoop configuration.
-     *
-     * @param cfg Hadoop configuration.
-     * @return Job info.
-     * @throws IgniteCheckedException If failed.
-     */
-    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
-        JobConf jobConf = new JobConf(cfg);
-
-        boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
-                || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
-
-        int numReduces = jobConf.getNumReduceTasks();
-
-        jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
-
-        if (jobConf.getUseNewMapper()) {
-            String mode = "new map API";
-
-            ensureNotSet(jobConf, "mapred.input.format.class", mode);
-            ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
-
-            if (numReduces != 0)
-                ensureNotSet(jobConf, "mapred.partitioner.class", mode);
-            else
-                ensureNotSet(jobConf, "mapred.output.format.class", mode);
-        }
-        else {
-            String mode = "map compatibility";
-
-            ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
-            ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
-
-            if (numReduces != 0)
-                ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
-            else
-                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-        }
-
-        if (numReduces != 0) {
-            jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
-
-            if (jobConf.getUseNewReducer()) {
-                String mode = "new reduce API";
-
-                ensureNotSet(jobConf, "mapred.output.format.class", mode);
-                ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
-            }
-            else {
-                String mode = "reduce compatibility";
-
-                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-                ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
-            }
-        }
-
-        Map<String, String> props = new HashMap<>();
-
-        for (Map.Entry<String, String> entry : jobConf)
-            props.put(entry.getKey(), entry.getValue());
-
-        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
-    }
-
-    /**
-     * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
-     * This is needed to transfer error outside the current class loader.
-     *
-     * @param e Original exception.
-     * @return IgniteCheckedException New exception.
-     */
-    public static IgniteCheckedException transformException(Throwable e) {
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-
-        e.printStackTrace(new PrintStream(os, true));
-
-        return new IgniteCheckedException(os.toString());
-    }
-
-    /**
-     * Returns work directory for job execution.
-     *
-     * @param locNodeId Local node ID.
-     * @param jobId Job ID.
-     * @return Working directory for job.
-     * @throws IgniteCheckedException If Failed.
-     */
-    public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
-        return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
-    }
-
-    /**
-     * Returns subdirectory of job working directory for task execution.
-     *
-     * @param locNodeId Local node ID.
-     * @param info Task info.
-     * @return Working directory for task.
-     * @throws IgniteCheckedException If Failed.
-     */
-    public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
-        File jobLocDir = jobLocalDir(locNodeId, info.jobId());
-
-        return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
-    }
-
-    /**
-     * Creates {@link Configuration} in a correct class loader context to avoid caching
-     * of inappropriate class loader in the Configuration object.
-     * @return New instance of {@link Configuration}.
-     */
-    public static Configuration safeCreateConfiguration() {
-        final ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(Configuration.class.getClassLoader());
-
-        try {
-            return new Configuration();
-        }
-        finally {
-            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
deleted file mode 100644
index a190b14..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
-
-/**
- * Basic Hadoop file system factory delegate.
- */
-public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
-    /** Proxy. */
-    protected final HadoopFileSystemFactory proxy;
-
-    /** Configuration of the secondary filesystem, never null. */
-    protected Configuration cfg;
-
-    /** Resulting URI. */
-    protected URI fullUri;
-
-    /** User name mapper. */
-    private UserNameMapper usrNameMapper;
-
-    /**
-     * Constructor.
-     *
-     * @param proxy Proxy.
-     */
-    public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) {
-        this.proxy = proxy;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem get(String name) throws IOException {
-        String name0 = IgfsUtils.fixUserName(name);
-
-        if (usrNameMapper != null)
-            name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
-
-        return getWithMappedName(name0);
-    }
-
-    /**
-     * Internal file system create routine.
-     *
-     * @param usrName User name.
-     * @return File system.
-     * @throws IOException If failed.
-     */
-    protected FileSystem getWithMappedName(String usrName) throws IOException {
-        assert cfg != null;
-
-        try {
-            // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
-            // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
-            // classloader to classloader of current class to avoid strange class-cast-exceptions.
-            ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
-
-            try {
-                return create(usrName);
-            }
-            finally {
-                HadoopCommonUtils.restoreContextClassLoader(oldLdr);
-            }
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to interrupt.", e);
-        }
-    }
-
-    /**
-     * Internal file system creation routine, invoked in correct class loader context.
-     *
-     * @param usrName User name.
-     * @return File system.
-     * @throws IOException If failed.
-     * @throws InterruptedException if the current thread is interrupted.
-     */
-    protected FileSystem create(String usrName) throws IOException, InterruptedException {
-        return FileSystem.get(fullUri, cfg, usrName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy;
-
-        cfg = HadoopUtils.safeCreateConfiguration();
-
-        if (proxy0.getConfigPaths() != null) {
-            for (String cfgPath : proxy0.getConfigPaths()) {
-                if (cfgPath == null)
-                    throw new NullPointerException("Configuration path cannot be null: " +
-                        Arrays.toString(proxy0.getConfigPaths()));
-                else {
-                    URL url = U.resolveIgniteUrl(cfgPath);
-
-                    if (url == null) {
-                        // If secConfPath is given, it should be resolvable:
-                        throw new IgniteException("Failed to resolve secondary file system configuration path " +
-                            "(ensure that it exists locally and you have read access to it): " + cfgPath);
-                    }
-
-                    cfg.addResource(url);
-                }
-            }
-        }
-
-        // If secondary fs URI is not given explicitly, try to get it from the configuration:
-        if (proxy0.getUri() == null)
-            fullUri = FileSystem.getDefaultUri(cfg);
-        else {
-            try {
-                fullUri = new URI(proxy0.getUri());
-            }
-            catch (URISyntaxException use) {
-                throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri());
-            }
-        }
-
-        usrNameMapper = proxy0.getUserNameMapper();
-
-        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
-            ((LifecycleAware)usrNameMapper).start();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
-            ((LifecycleAware)usrNameMapper).stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
deleted file mode 100644
index 0cec8ca..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
-
-import java.io.IOException;
-
-/**
- * Caching Hadoop file system factory delegate.
- */
-public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
-    /** Per-user file system cache. */
-    private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
-        new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
-            @Override public FileSystem createValue(String key) throws IOException {
-                return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key);
-            }
-        }
-    );
-
-    /**
-     * Constructor.
-     *
-     * @param proxy Proxy.
-     */
-    public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) {
-        super(proxy);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem getWithMappedName(String name) throws IOException {
-        return cache.getOrCreate(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        super.start();
-
-        // Disable caching.
-        cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        super.stop();
-
-        try {
-            cache.close();
-        }
-        catch (IgniteCheckedException ice) {
-            throw new IgniteException(ice);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
deleted file mode 100644
index 20ac88e..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import java.io.IOException;
-
-/**
- * Hadoop file system factory delegate for non-standard factories.
- */
-public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
-    /** Factory. */
-    private final HadoopFileSystemFactory factory;
-
-    /**
-     * Constructor.
-     *
-     * @param factory Factory.
-     */
-    public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) {
-        assert factory != null;
-
-        this.factory = factory;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem get(String usrName) throws IOException {
-        return (FileSystem)factory.get(usrName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        if (factory instanceof LifecycleAware)
-            ((LifecycleAware)factory).start();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        if (factory instanceof LifecycleAware)
-            ((LifecycleAware)factory).stop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
deleted file mode 100644
index 9cc2be4..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.T2;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Map;
-
-/**
- * Counter writer delegate implementation.
- */
-@SuppressWarnings("unused")
-public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate {
-    /** */
-    private static final String USER_MACRO = "${USER}";
-
-    /** */
-    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
-
-    /**
-     * Constructor.
-     *
-     * @param proxy Proxy (not used).
-     */
-    public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException {
-        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
-
-        final HadoopJobInfo jobInfo = job.info();
-
-        final HadoopJobId jobId = job.id();
-
-        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
-            hadoopCfg.set(e.getKey(), e.getValue());
-
-        String user = jobInfo.user();
-
-        user = IgfsUtils.fixUserName(user);
-
-        String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY);
-
-        if (dir == null)
-            dir = DEFAULT_COUNTER_WRITER_DIR;
-
-        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
-        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
-
-        try {
-            hadoopCfg.set(MRJobConfig.USER_NAME, user);
-
-            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg);
-
-            fs.mkdirs(jobStatPath);
-
-            try (PrintStream out = new PrintStream(fs.create(
-                new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
-                }
-
-                out.flush();
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
deleted file mode 100644
index fcad674..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsException;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
-import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
-import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties;
-import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Secondary file system implementation.
- */
-@SuppressWarnings("unused")
-public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate {
-    /** The default user name. It is used if no user context is set. */
-    private final String dfltUsrName;
-
-    /** Factory. */
-    private final HadoopFileSystemFactoryDelegate factory;
-
-    /**
-     * Constructor.
-     *
-     * @param proxy Proxy.
-    */
-    public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) {
-        assert proxy.getFileSystemFactory() != null;
-
-        dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName());
-
-        HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory();
-
-        if (factory0 == null)
-            factory0 = new CachingHadoopFileSystemFactory();
-
-        factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean exists(IgfsPath path) {
-        try {
-            return fileSystemForUser().exists(convert(path));
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
-        HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
-
-        final FileSystem fileSys = fileSystemForUser();
-
-        try {
-            if (props0.userName() != null || props0.groupName() != null)
-                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
-            if (props0.permission() != null)
-                fileSys.setPermission(convert(path), props0.permission());
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
-        }
-
-        //Result is not used in case of secondary FS.
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void rename(IgfsPath src, IgfsPath dest) {
-        // Delegate to the secondary file system.
-        try {
-            if (!fileSystemForUser().rename(convert(src), convert(dest)))
-                throw new IgfsException("Failed to rename (secondary file system returned false) " +
-                    "[src=" + src + ", dest=" + dest + ']');
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(IgfsPath path, boolean recursive) {
-        try {
-            return fileSystemForUser().delete(convert(path), recursive);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path) {
-        try {
-            if (!fileSystemForUser().mkdirs(convert(path)))
-                throw new IgniteException("Failed to make directories [path=" + path + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
-        try {
-            if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
-                throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses)
-                res.add(new IgfsPath(path, status.getPath().getName()));
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus s : statuses) {
-                IgfsEntryInfo fsInfo = s.isDirectory() ?
-                    IgfsUtils.createDirectory(
-                        IgniteUuid.randomUuid(),
-                        null,
-                        properties(s),
-                        s.getAccessTime(),
-                        s.getModificationTime()
-                    ) :
-                    IgfsUtils.createFile(
-                        IgniteUuid.randomUuid(),
-                        (int)s.getBlockSize(),
-                        s.getLen(),
-                        null,
-                        null,
-                        false,
-                        properties(s),
-                        s.getAccessTime(),
-                        s.getModificationTime()
-                    );
-
-                res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
-            }
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
-        try {
-            return fileSystemForUser().create(convert(path), overwrite);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
-        long blockSize, @Nullable Map<String, String> props) {
-        HadoopIgfsProperties props0 =
-            new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
-
-        try {
-            return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
-                (short) replication, blockSize, null);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
-                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
-                ", blockSize=" + blockSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
-        @Nullable Map<String, String> props) {
-        try {
-            return fileSystemForUser().append(convert(path), bufSize);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) {
-        try {
-            final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
-
-            if (status == null)
-                return null;
-
-            final Map<String, String> props = properties(status);
-
-            return new IgfsFile() {
-                @Override public IgfsPath path() {
-                    return path;
-                }
-
-                @Override public boolean isFile() {
-                    return status.isFile();
-                }
-
-                @Override public boolean isDirectory() {
-                    return status.isDirectory();
-                }
-
-                @Override public int blockSize() {
-                    // By convention directory has blockSize == 0, while file has blockSize > 0:
-                    return isDirectory() ? 0 : (int)status.getBlockSize();
-                }
-
-                @Override public long groupBlockSize() {
-                    return status.getBlockSize();
-                }
-
-                @Override public long accessTime() {
-                    return status.getAccessTime();
-                }
-
-                @Override public long modificationTime() {
-                    return status.getModificationTime();
-                }
-
-                @Override public String property(String name) throws IllegalArgumentException {
-                    String val = props.get(name);
-
-                    if (val ==  null)
-                        throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
-                    return val;
-                }
-
-                @Nullable @Override public String property(String name, @Nullable String dfltVal) {
-                    String val = props.get(name);
-
-                    return val == null ? dfltVal : val;
-                }
-
-                @Override public long length() {
-                    return status.getLen();
-                }
-
-                /** {@inheritDoc} */
-                @Override public Map<String, String> properties() {
-                    return props;
-                }
-            };
-        }
-        catch (FileNotFoundException ignore) {
-            return null;
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long usedSpaceSize() {
-        try {
-            // We don't use FileSystem#getUsed() since it counts only the files
-            // in the filesystem root, not all the files recursively.
-            return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
-        try {
-            // We don't use FileSystem#getUsed() since it counts only the files
-            // in the filesystem root, not all the files recursively.
-            fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed set times for path: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    public void start() {
-        factory.start();
-    }
-
-    /** {@inheritDoc} */
-    public void stop() {
-        factory.stop();
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        URI uri = fileSystemForUser().getUri();
-
-        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
-    }
-
-    /**
-     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
-     *
-     * @param e Exception to check.
-     * @param detailMsg Detailed error message.
-     * @return Appropriate exception.
-     */
-    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        return cast(detailMsg, e);
-    }
-
-    /**
-     * Cast IO exception to IGFS exception.
-     *
-     * @param e IO exception.
-     * @return IGFS exception.
-     */
-    public static IgfsException cast(String msg, IOException e) {
-        if (e instanceof FileNotFoundException)
-            return new IgfsPathNotFoundException(e);
-        else if (e instanceof ParentNotDirectoryException)
-            return new IgfsParentNotDirectoryException(msg, e);
-        else if (e instanceof PathIsNotEmptyDirectoryException)
-            return new IgfsDirectoryNotEmptyException(e);
-        else if (e instanceof PathExistsException)
-            return new IgfsPathAlreadyExistsException(msg, e);
-        else
-            return new IgfsException(msg, e);
-    }
-
-    /**
-     * Convert Hadoop FileStatus properties to map.
-     *
-     * @param status File status.
-     * @return IGFS attributes.
-     */
-    private static Map<String, String> properties(FileStatus status) {
-        FsPermission perm = status.getPermission();
-
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        HashMap<String, String> res = new HashMap<>(3);
-
-        res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
-        res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
-        res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
-
-        return res;
-    }
-
-    /**
-     * Gets the FileSystem for the current context user.
-     * @return the FileSystem instance, never null.
-     */
-    private FileSystem fileSystemForUser() {
-        String user = IgfsUserContext.currentUser();
-
-        if (F.isEmpty(user))
-            user = IgfsUtils.fixUserName(dfltUsrName);
-
-        assert !F.isEmpty(user);
-
-        try {
-            return (FileSystem)factory.get(user);
-        }
-        catch (IOException ioe) {
-            throw new IgniteException(ioe);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
deleted file mode 100644
index c71dedb..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.delegate;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * Kerberos Hadoop file system factory delegate.
- */
-public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
-    /** The re-login interval. */
-    private long reloginInterval;
-
-    /** Time of last re-login attempt, in system milliseconds. */
-    private volatile long lastReloginTime;
-
-    /**
-     * Constructor.
-     *
-     * @param proxy Proxy.
-     */
-    public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) {
-        super(proxy);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem getWithMappedName(String name) throws IOException {
-        reloginIfNeeded();
-
-        return super.getWithMappedName(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
-        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
-            UserGroupInformation.getLoginUser());
-
-        return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-            @Override public FileSystem run() throws Exception {
-                return FileSystem.get(fullUri, cfg);
-            }
-        });
-    }
-
-    @Override public void start() throws IgniteException {
-        super.start();
-
-        KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy;
-
-        A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty.");
-        A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty.");
-        A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative.");
-
-        reloginInterval = proxy0.getReloginInterval();
-
-        try {
-            UserGroupInformation.setConfiguration(cfg);
-            UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab());
-        }
-        catch (IOException ioe) {
-            throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() +
-                ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe);
-        }
-    }
-
-    /**
-     * Re-logins the user if needed.
-     * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
-     * frequent than one attempt per {@code reloginInterval}.
-     * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing
-     * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
-     *
-     * <p>This operation expected to be called upon each operation with the file system created with the factory.
-     * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
-     * is no need to invoke it otherwise specially.
-     *
-     * @throws IOException If login fails.
-     */
-    private void reloginIfNeeded() throws IOException {
-        long now = System.currentTimeMillis();
-
-        if (now >= lastReloginTime + reloginInterval) {
-            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-
-            lastReloginTime = now;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
deleted file mode 100644
index 3644511..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.fs;
-
-import java.io.IOException;
-import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.GridStringBuilder;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * File system cache utility methods used by Map-Reduce tasks and jobs.
- */
-public class HadoopFileSystemCacheUtils {
-    /**
-     * A common static factory method. Creates new HadoopLazyConcurrentMap.
-     * @return a new HadoopLazyConcurrentMap.
-     */
-    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
-        return new HadoopLazyConcurrentMap<>(
-            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
-                @Override public FileSystem createValue(FsCacheKey key) throws IOException {
-                    try {
-                        assert key != null;
-
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
-
-                        String scheme = uri.getScheme();
-
-                        // Copy the configuration to avoid altering the external object.
-                        Configuration cfg = new Configuration(key.configuration());
-
-                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
-
-                        cfg.setBoolean(prop, true);
-
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        throw new IOException("Failed to create file system due to interrupt.", e);
-                    }
-                }
-            }
-        );
-    }
-
-    /**
-     * Gets non-null user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
-     * @return the user name, never null.
-     */
-    private static String getMrHadoopUser(Configuration cfg) throws IOException {
-        String user = cfg.get(MRJobConfig.USER_NAME);
-
-        if (user == null)
-            user = IgniteHadoopFileSystem.getFsHadoopUser();
-
-        return user;
-    }
-
-    /**
-     * Common method to get the V1 file system in MapRed engine.
-     * It gets the filesystem for the user specified in the
-     * configuration with {@link MRJobConfig#USER_NAME} property.
-     * The file systems are created and cached in the given map upon first request.
-     *
-     * @param uri The file system uri.
-     * @param cfg The configuration.
-     * @param map The caching map.
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
-            throws IOException {
-        assert map != null;
-        assert cfg != null;
-
-        final String usr = getMrHadoopUser(cfg);
-
-        assert usr != null;
-
-        if (uri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-
-        final FileSystem fs;
-
-        try {
-            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
-            fs = map.getOrCreate(key);
-        }
-        catch (IgniteException ie) {
-            throw new IOException(ie);
-        }
-
-        assert fs != null;
-        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
-        return fs;
-    }
-
-    /**
-     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
-     * @param uri0 The uri.
-     * @param cfg The cfg.
-     * @return Correct URI.
-     */
-    private static URI fixUri(URI uri0, Configuration cfg) {
-        if (uri0 == null)
-            return FileSystem.getDefaultUri(cfg);
-
-        String scheme = uri0.getScheme();
-        String authority = uri0.getAuthority();
-
-        if (authority == null) {
-            URI dfltUri = FileSystem.getDefaultUri(cfg);
-
-            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
-                return dfltUri;
-        }
-
-        return uri0;
-    }
-
-    /**
-     * Note that configuration is not a part of the key.
-     * It is used solely to initialize the first instance
-     * that is created for the key.
-     */
-    public static final class FsCacheKey {
-        /** */
-        private final URI uri;
-
-        /** */
-        private final String usr;
-
-        /** */
-        private final String equalityKey;
-
-        /** */
-        private final Configuration cfg;
-
-        /**
-         * Constructor
-         */
-        public FsCacheKey(URI uri, String usr, Configuration cfg) {
-            assert uri != null;
-            assert usr != null;
-            assert cfg != null;
-
-            this.uri = fixUri(uri, cfg);
-            this.usr = usr;
-            this.cfg = cfg;
-
-            this.equalityKey = createEqualityKey();
-        }
-
-        /**
-         * Creates String key used for equality and hashing.
-         */
-        private String createEqualityKey() {
-            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
-            if (uri.getScheme() != null)
-                sb.a(uri.getScheme().toLowerCase());
-
-            sb.a("://");
-
-            if (uri.getAuthority() != null)
-                sb.a(uri.getAuthority().toLowerCase());
-
-            return sb.toString();
-        }
-
-        /**
-         * The URI.
-         */
-        public URI uri() {
-            return uri;
-        }
-
-        /**
-         * The User.
-         */
-        public String user() {
-            return usr;
-        }
-
-        /**
-         * The Configuration.
-         */
-        public Configuration configuration() {
-            return cfg;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean equals(Object obj) {
-            if (obj == this)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return equalityKey.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return equalityKey;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
deleted file mode 100644
index 5115cb4..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FsConstants;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class HadoopFileSystemsUtils {
-    /** Name of the property for setting working directory on create new local FS instance. */
-    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
-    /**
-     * Setup wrappers of filesystems to support the separate working directory.
-     *
-     * @param cfg Config for setup.
-     */
-    public static void setupFileSystems(Configuration cfg) {
-        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
-        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
-                HadoopLocalFileSystemV2.class.getName());
-    }
-
-    /**
-     * Gets the property name to disable file system cache.
-     * @param scheme The file system URI scheme.
-     * @return The property name. If scheme is null,
-     * returns "fs.null.impl.disable.cache".
-     */
-    public static String disableFsCachePropertyName(@Nullable String scheme) {
-        return String.format("fs.%s.impl.disable.cache", scheme);
-    }
-}
\ No newline at end of file


Mime
View raw message