ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [42/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.
Date Mon, 26 Sep 2016 12:13:56 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
deleted file mode 100644
index 9902142..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsException;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
-import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
-import org.apache.ignite.internal.igfs.common.IgfsMessage;
-import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
-import org.apache.ignite.internal.igfs.common.IgfsStatusRequest;
-import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.lang.IgniteClosure;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE;
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK;
-
-/**
- * Communication with external process (TCP or shmem).
- */
-public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
-    /** Expected result is boolean. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
-
-    /** Expected result is boolean. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
-
-    /** Expected result is {@code IgfsHandshakeResponse} */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
-        IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
-
-    /** Expected result is {@code IgfsStatus} */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
-        createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
-        IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
-        Collection<IgfsFile>> FILE_COL_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
-        Collection<IgfsPath>> PATH_COL_RES = createClosure();
-
-    /** Expected result is {@code IgfsPathSummary}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
-        createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
-        Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
-
-    /** Grid name. */
-    private final String grid;
-
-    /** IGFS name. */
-    private final String igfs;
-
-    /** The user this out proc is performing on behalf of. */
-    private final String userName;
-
-    /** Client log. */
-    private final Log log;
-
-    /** Client IO. */
-    private final HadoopIgfsIpcIo io;
-
-    /** Event listeners. */
-    private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
-
-    /**
-     * Constructor for TCP endpoint.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
-        this(host, port, grid, igfs, false, log, user);
-    }
-
-    /**
-     * Constructor for shmem endpoint.
-     *
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
-        this(null, port, grid, igfs, true, log, user);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param shmem Shared memory flag.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
-        throws IOException {
-        assert host != null && !shmem || host == null && shmem :
-            "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
-
-        String endpoint = host != null ? host + ":" + port : "shmem:" + port;
-
-        this.grid = grid;
-        this.igfs = igfs;
-        this.log = log;
-        this.userName = IgfsUtils.fixUserName(user);
-
-        io = HadoopIgfsIpcIo.get(log, endpoint);
-
-        io.addEventListener(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
-        final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
-
-        req.gridName(grid);
-        req.igfsName(igfs);
-        req.logDirectory(logDir);
-
-        return io.send(req).chain(HANDSHAKE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        assert io != null;
-
-        io.removeEventListener(this);
-
-        if (force)
-            io.forceClose();
-        else
-            io.release();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(INFO);
-        msg.path(path);
-        msg.userName(userName);
-
-        return io.send(msg).chain(FILE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(UPDATE);
-        msg.path(path);
-        msg.properties(props);
-        msg.userName(userName);
-
-        return io.send(msg).chain(FILE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(SET_TIMES);
-        msg.path(path);
-        msg.accessTime(accessTime);
-        msg.modificationTime(modificationTime);
-        msg.userName(userName);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(RENAME);
-        msg.path(src);
-        msg.destinationPath(dest);
-        msg.userName(userName);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(DELETE);
-        msg.path(path);
-        msg.flag(recursive);
-        msg.userName(userName);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
-        throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(AFFINITY);
-        msg.path(path);
-        msg.start(start);
-        msg.length(len);
-        msg.userName(userName);
-
-        return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(PATH_SUMMARY);
-        msg.path(path);
-        msg.userName(userName);
-
-        return io.send(msg).chain(SUMMARY_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(MAKE_DIRECTORIES);
-        msg.path(path);
-        msg.properties(props);
-        msg.userName(userName);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(LIST_FILES);
-        msg.path(path);
-        msg.userName(userName);
-
-        return io.send(msg).chain(FILE_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(LIST_PATHS);
-        msg.path(path);
-        msg.userName(userName);
-
-        return io.send(msg).chain(PATH_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
-        return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_READ);
-        msg.path(path);
-        msg.flag(false);
-        msg.userName(userName);
-
-        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
-        int seqReadsBeforePrefetch) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_READ);
-        msg.path(path);
-        msg.flag(true);
-        msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
-        msg.userName(userName);
-
-        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_CREATE);
-        msg.path(path);
-        msg.flag(overwrite);
-        msg.colocate(colocate);
-        msg.properties(props);
-        msg.replication(replication);
-        msg.blockSize(blockSize);
-        msg.userName(userName);
-
-        Long streamId = io.send(msg).chain(LONG_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, streamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_APPEND);
-        msg.path(path);
-        msg.flag(create);
-        msg.properties(props);
-        msg.userName(userName);
-
-        Long streamId = io.send(msg).chain(LONG_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, streamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
-        final @Nullable byte[] outBuf, final int outOff, final int outLen) {
-        assert len > 0;
-
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(READ_BLOCK);
-        msg.streamId((long) desc.target());
-        msg.position(pos);
-        msg.length(len);
-
-        try {
-            return io.send(msg, outBuf, outOff, outLen);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
-        throws IOException {
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(WRITE_BLOCK);
-        msg.streamId((long) desc.target());
-        msg.data(data);
-        msg.position(off);
-        msg.length(len);
-
-        try {
-            io.sendPlain(msg);
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(CLOSE);
-        msg.streamId((long)desc.target());
-
-        try {
-            io.send(msg).chain(BOOL_RES).get();
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
-        HadoopIgfsStreamEventListener lsnr) {
-        long streamId = desc.target();
-
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
-
-        assert lsnr0 == null || lsnr0 == lsnr;
-
-        if (log.isDebugEnabled())
-            log.debug("Added stream event listener [streamId=" + streamId + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
-        long streamId = desc.target();
-
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
-
-        if (lsnr0 != null && log.isDebugEnabled())
-            log.debug("Removed stream event listener [streamId=" + streamId + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClose() {
-        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
-            try {
-                lsnr.onClose();
-            }
-            catch (IgniteCheckedException e) {
-                log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onError(long streamId, String errMsg) {
-        HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
-
-        if (lsnr != null)
-            lsnr.onError(errMsg);
-        else
-            log.warn("Received write error response for not registered output stream (will ignore) " +
-                "[streamId= " + streamId + ']');
-    }
-
-    /**
-     * Creates conversion closure for given type.
-     *
-     * @param <T> Type of expected result.
-     * @return Conversion closure.
-     */
-    @SuppressWarnings("unchecked")
-    private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() {
-        return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() {
-            @Override public T apply(IgniteInternalFuture<IgfsMessage> fut) {
-                try {
-                    IgfsControlResponse res = (IgfsControlResponse)fut.get();
-
-                    if (res.hasError())
-                        res.throwError();
-
-                    return (T)res.response();
-                }
-                catch (IgfsException | IgniteCheckedException e) {
-                    throw new GridClosureException(e);
-                }
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public String user() {
-        return userName;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
deleted file mode 100644
index 8f7458b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * IGFS Hadoop output stream implementation.
- */
-public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener {
-    /** Log instance. */
-    private Log log;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private long logStreamId;
-
-    /** Server stream delegate. */
-    private HadoopIgfsStreamDelegate delegate;
-
-    /** Closed flag. */
-    private volatile boolean closed;
-
-    /** Flag set if stream was closed due to connection breakage. */
-    private boolean connBroken;
-
-    /** Error message. */
-    private volatile String errMsg;
-
-    /** Read time. */
-    private long writeTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of written bytes. */
-    private long total;
-
-    /**
-     * Creates light output stream.
-     *
-     * @param delegate Server stream delegate.
-     * @param log Logger to use.
-     * @param clientLog Client logger.
-     */
-    public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log,
-        IgfsLogger clientLog, long logStreamId) {
-        this.delegate = delegate;
-        this.log = log;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-
-        delegate.hadoop().addEventListener(delegate, this);
-    }
-
-    /**
-     * Read start.
-     */
-    private void writeStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void writeEnd() {
-        long now = System.nanoTime();
-
-        writeTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
-        check();
-
-        writeStart();
-
-        try {
-            delegate.hadoop().writeData(delegate, b, off, len);
-
-            total += len;
-        }
-        finally {
-            writeEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(int b) throws IOException {
-        write(new byte[] {(byte)b});
-
-        total++;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush() throws IOException {
-        delegate.hadoop().flush(delegate);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        if (!closed) {
-            if (log.isDebugEnabled())
-                log.debug("Closing output stream: " + delegate);
-
-            writeStart();
-
-            delegate.hadoop().closeStream(delegate);
-
-            markClosed(false);
-
-            writeEnd();
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
-
-            if (log.isDebugEnabled())
-                log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
-                    ", userTime=" + userTime / 1000 + ']');
-        }
-        else if(connBroken)
-            throw new IOException(
-                "Failed to close stream, because connection was broken (data could have been lost).");
-    }
-
-    /**
-     * Marks stream as closed.
-     *
-     * @param connBroken {@code True} if connection with server was lost.
-     */
-    private void markClosed(boolean connBroken) {
-        // It is ok to have race here.
-        if (!closed) {
-            closed = true;
-
-            delegate.hadoop().removeEventListener(delegate);
-
-            this.connBroken = connBroken;
-        }
-    }
-
-    /**
-     * @throws IOException If check failed.
-     */
-    private void check() throws IOException {
-        String errMsg0 = errMsg;
-
-        if (errMsg0 != null)
-            throw new IOException(errMsg0);
-
-        if (closed) {
-            if (connBroken)
-                throw new IOException("Server connection was lost.");
-            else
-                throw new IOException("Stream is closed.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClose() throws IgniteCheckedException {
-        markClosed(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onError(String errMsg) {
-        this.errMsg = errMsg;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
deleted file mode 100644
index 90f6bca..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.util.Map;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-
-/**
- * Hadoop file system properties.
- */
-public class HadoopIgfsProperties {
-    /** Username. */
-    private String usrName;
-
-    /** Group name. */
-    private String grpName;
-
-    /** Permissions. */
-    private FsPermission perm;
-
-    /**
-     * Constructor.
-     *
-     * @param props Properties.
-     * @throws IgniteException In case of error.
-     */
-    public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
-        usrName = props.get(IgfsUtils.PROP_USER_NAME);
-        grpName = props.get(IgfsUtils.PROP_GROUP_NAME);
-
-        String permStr = props.get(IgfsUtils.PROP_PERMISSION);
-
-        if (permStr != null) {
-            try {
-                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
-            }
-            catch (NumberFormatException ignore) {
-                throw new IgniteException("Permissions cannot be parsed: " + permStr);
-            }
-        }
-    }
-
-    /**
-     * Get user name.
-     *
-     * @return User name.
-     */
-    public String userName() {
-        return usrName;
-    }
-
-    /**
-     * Get group name.
-     *
-     * @return Group name.
-     */
-    public String groupName() {
-        return grpName;
-    }
-
-    /**
-     * Get permission.
-     *
-     * @return Permission.
-     */
-    public FsPermission permission() {
-        return perm;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
deleted file mode 100644
index 5cee947..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-
-/**
- * Secondary Hadoop file system input stream wrapper.
- */
-public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
-    /** Actual input stream to the secondary file system. */
-    private final FSDataInputStream is;
-
-    /** Client logger. */
-    private final IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private final long logStreamId;
-
-    /** Read time. */
-    private long readTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of read bytes. */
-    private long total;
-
-    /** Closed flag. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     *
-     * @param is Actual input stream to the secondary file system.
-     * @param clientLog Client log.
-     */
-    public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
-        assert is != null;
-        assert clientLog != null;
-
-        this.is = is;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(byte[] b) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read(b);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = super.read(b, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long skip(long n) throws IOException {
-        readStart();
-
-        long res;
-
-        try {
-            res =  is.skip(n);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSkip(logStreamId, res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int available() throws IOException {
-        readStart();
-
-        try {
-            return is.available();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (!closed) {
-            closed = true;
-
-            readStart();
-
-            try {
-                is.close();
-            }
-            finally {
-                readEnd();
-            }
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void mark(int readLimit) {
-        readStart();
-
-        try {
-            is.mark(readLimit);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logMark(logStreamId, readLimit);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void reset() throws IOException {
-        readStart();
-
-        try {
-            is.reset();
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logReset(logStreamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean markSupported() {
-        readStart();
-
-        try {
-            return is.markSupported();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read() throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read();
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total++;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read(pos, buf, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-        readStart();
-
-        try {
-            is.readFully(pos, buf, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        total += len;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
-        readStart();
-
-        try {
-            is.readFully(pos, buf);
-        }
-        finally {
-            readEnd();
-        }
-
-        total += buf.length;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, buf.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void seek(long pos) throws IOException {
-        readStart();
-
-        try {
-            is.seek(pos);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSeek(logStreamId, pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long getPos() throws IOException {
-        readStart();
-
-        try {
-            return is.getPos();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
-        readStart();
-
-        try {
-            return is.seekToNewSource(targetPos);
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /**
-     * Read start.
-     */
-    private void readStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void readEnd() {
-        long now = System.nanoTime();
-
-        readTime += now - lastTs;
-
-        lastTs = now;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
deleted file mode 100644
index eade0f0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-
-/**
- * Secondary Hadoop file system output stream wrapper.
- */
-public class HadoopIgfsProxyOutputStream extends OutputStream {
-    /** Actual output stream. */
-    private FSDataOutputStream os;
-
-    /** Client logger. */
-    private final IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private final long logStreamId;
-
-    /** Read time. */
-    private long writeTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of written bytes. */
-    private long total;
-
-    /** Closed flag. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     *
-     * @param os Actual output stream.
-     * @param clientLog Client logger.
-     * @param logStreamId Log stream ID.
-     */
-    public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
-        assert os != null;
-        assert clientLog != null;
-
-        this.os = os;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total++;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total += b.length;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b, off, len);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total += len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void flush() throws IOException {
-        writeStart();
-
-        try {
-            os.flush();
-        }
-        finally {
-            writeEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (!closed) {
-            closed = true;
-
-            writeStart();
-
-            try {
-                os.close();
-            }
-            finally {
-                writeEnd();
-            }
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
-        }
-    }
-
-    /**
-     * Read start.
-     */
-    private void writeStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void writeEnd() {
-        long now = System.nanoTime();
-
-        writeTime += now - lastTs;
-
-        lastTs = now;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
deleted file mode 100644
index a0577ce..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
- * requested.
- * <p>
- * The class is expected to be used only from synchronized context and therefore is not tread-safe.
- */
-public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable {
-    /** Secondary file system. */
-    private final FileSystem fs;
-
-    /** Path to the file to open. */
-    private final Path path;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Actual input stream. */
-    private FSDataInputStream in;
-
-    /** Cached error occurred during output stream open. */
-    private IOException err;
-
-    /** Flag indicating that the stream was already opened. */
-    private boolean opened;
-
-    /**
-     * Constructor.
-     *
-     * @param fs Secondary file system.
-     * @param path Path to the file to open.
-     * @param bufSize Buffer size.
-     */
-    public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) {
-        assert fs != null;
-        assert path != null;
-
-        this.fs = fs;
-        this.path = path;
-        this.bufSize = bufSize;
-    }
-
-    /** Get input stream. */
-    private PositionedReadable in() throws IOException {
-        if (opened) {
-            if (err != null)
-                throw err;
-        }
-        else {
-            opened = true;
-
-            try {
-                in = fs.open(path, bufSize);
-
-                if (in == null)
-                    throw new IOException("Failed to open input stream (file system returned null): " + path);
-            }
-            catch (IOException e) {
-                err = e;
-
-                throw err;
-            }
-        }
-
-        return in;
-    }
-
-    /**
-     * Close wrapped input stream in case it was previously opened.
-     */
-    @Override public void close() {
-        U.closeQuiet(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
-        return in().read(pos, buf, off, len);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
deleted file mode 100644
index 37b58ab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * IGFS Hadoop stream descriptor.
- */
-public class HadoopIgfsStreamDelegate {
-    /** RPC handler. */
-    private final HadoopIgfsEx hadoop;
-
-    /** Target. */
-    private final Object target;
-
-    /** Optional stream length. */
-    private final long len;
-
-    /**
-     * Constructor.
-     *
-     * @param target Target.
-     */
-    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
-        this(hadoop, target, -1);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param target Target.
-     * @param len Optional length.
-     */
-    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
-        assert hadoop != null;
-        assert target != null;
-
-        this.hadoop = hadoop;
-        this.target = target;
-        this.len = len;
-    }
-
-    /**
-     * @return RPC handler.
-     */
-    public HadoopIgfsEx hadoop() {
-        return hadoop;
-    }
-
-    /**
-     * @return Stream target.
-     */
-    @SuppressWarnings("unchecked")
-    public <T> T target() {
-        return (T) target;
-    }
-
-    /**
-     * @return Length.
-     */
-    public long length() {
-        return len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return System.identityHashCode(target);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
-            target == ((HadoopIgfsStreamDelegate)obj).target;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopIgfsStreamDelegate.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
deleted file mode 100644
index d81f765..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * IGFS input stream event listener.
- */
-public interface HadoopIgfsStreamEventListener {
-    /**
-     * Callback invoked when the stream is being closed.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onClose() throws IgniteCheckedException;
-
-    /**
-     * Callback invoked when remote error occurs.
-     *
-     * @param errMsg Error message.
-     */
-    public void onError(String errMsg);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
deleted file mode 100644
index fa5cbc5..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utility constants and methods for IGFS Hadoop file system.
- */
-public class HadoopIgfsUtils {
-    /** Parameter name for endpoint no embed mode flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
-
-    /** Parameter name for endpoint no shared memory flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
-
-    /** Parameter name for endpoint no local TCP flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
-
-    /**
-     * Get string parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return String value.
-     */
-    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
-        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Get integer parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Integer value.
-     * @throws IOException In case of parse exception.
-     */
-    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
-        String name0 = String.format(name, authority != null ? authority : "");
-
-        try {
-            return cfg.getInt(name0, dflt);
-        }
-        catch (NumberFormatException ignore) {
-            throw new IOException("Failed to parse parameter value to integer: " + name0);
-        }
-    }
-
-    /**
-     * Get boolean parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Boolean value.
-     */
-    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
-        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @return Casted exception.
-     */
-    public static IOException cast(IgniteCheckedException e) {
-        return cast(e, null);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @param path Path for exceptions.
-     * @return Casted exception.
-     */
-    @SuppressWarnings("unchecked")
-    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
-        assert e != null;
-
-        // First check for any nested IOException; if exists - re-throw it.
-        if (e.hasCause(IOException.class))
-            return e.getCause(IOException.class);
-        else if (e.hasCause(IgfsPathNotFoundException.class))
-            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
-        else if (e.hasCause(IgfsParentNotDirectoryException.class))
-            return new ParentNotDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
-            return new PathIsNotEmptyDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
-            return new PathExistsException(path);
-        else {
-            String msg = e.getMessage();
-
-            return msg == null ? new IOException(e) : new IOException(msg, e);
-        }
-    }
-
-    /**
-     * Deletes all files from the given file system.
-     *
-     * @param fs The file system to clean up.
-     * @throws IOException On error.
-     */
-    public static void clear(FileSystem fs) throws IOException {
-        // Delete root contents:
-        FileStatus[] statuses = fs.listStatus(new Path("/"));
-
-        if (statuses != null) {
-            for (FileStatus stat: statuses)
-                fs.delete(stat.getPath(), true);
-        }
-    }
-
-    /**
-     * Deletes all files from the given file system.
-     *
-     * @param fs The file system to clean up.
-     * @throws IOException On error.
-     */
-    public static void clear(AbstractFileSystem fs) throws IOException {
-        // Delete root contents:
-        FileStatus[] statuses = fs.listStatus(new Path("/"));
-
-        if (statuses != null) {
-            for (FileStatus stat: statuses)
-                fs.delete(stat.getPath(), true);
-        }
-    }
-
-    /**
-     * Constructor.
-     */
-    private HadoopIgfsUtils() {
-        // No-op.
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
deleted file mode 100644
index f4ee97f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.IgniteIllegalStateException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteState.STARTED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
-
-/**
- * Wrapper for IGFS server.
- */
-public class HadoopIgfsWrapper implements HadoopIgfs {
-    /** Delegate. */
-    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
-
-    /** Authority. */
-    private final String authority;
-
-    /** Connection string. */
-    private final HadoopIgfsEndpoint endpoint;
-
-    /** Log directory. */
-    private final String logDir;
-
-    /** Configuration. */
-    private final Configuration conf;
-
-    /** Logger. */
-    private final Log log;
-
-    /** The user name this wrapper works on behalf of. */
-    private final String userName;
-
-    /**
-     * Constructor.
-     *
-     * @param authority Authority (connection string).
-     * @param logDir Log directory for server.
-     * @param conf Configuration.
-     * @param log Current logger.
-     */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
-        throws IOException {
-        try {
-            this.authority = authority;
-            this.endpoint = new HadoopIgfsEndpoint(authority);
-            this.logDir = logDir;
-            this.conf = conf;
-            this.log = log;
-            this.userName = user;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to parse endpoint: " + authority, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
-            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) {
-                return hndResp;
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        Delegate delegate = delegateRef.get();
-
-        if (delegate != null && delegateRef.compareAndSet(delegate, null))
-            delegate.close(force);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.info(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.update(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.setTimes(path, accessTime, modificationTime);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.rename(src, dest);
-            }
-        }, src);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.delete(path, recursive);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
-        final long len) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
-            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.affinity(path, start, len);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
-            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.contentSummary(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.mkdirs(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
-            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listFiles(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
-            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listPaths(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
-            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.fsStatus();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path, seqReadsBeforePrefetch);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
-        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
-        @Nullable final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.append(path, create, props);
-            }
-        }, path);
-    }
-
-    /**
-     * Execute closure which is not path-specific.
-     *
-     * @param clo Closure.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
-        return withReconnectHandling(clo, null);
-    }
-
-    /**
-     * Execute closure.
-     *
-     * @param clo Closure.
-     * @param path Path for exceptions.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
-        throws IOException {
-        Exception err = null;
-
-        for (int i = 0; i < 2; i++) {
-            Delegate curDelegate = null;
-
-            boolean close = false;
-            boolean force = false;
-
-            try {
-                curDelegate = delegate();
-
-                assert curDelegate != null;
-
-                close = curDelegate.doomed;
-
-                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
-            }
-            catch (HadoopIgfsCommunicationException e) {
-                if (curDelegate != null && !curDelegate.doomed) {
-                    // Try getting rid fo faulty delegate ASAP.
-                    delegateRef.compareAndSet(curDelegate, null);
-
-                    close = true;
-                    force = true;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message to a server: " + e);
-
-                err = e;
-            }
-            catch (IgniteCheckedException e) {
-                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
-            }
-            finally {
-                if (close) {
-                    assert curDelegate != null;
-
-                    curDelegate.close(force);
-                }
-            }
-        }
-
-        List<Throwable> list = X.getThrowableList(err);
-
-        Throwable cause = list.get(list.size() - 1);
-
-        throw new IOException("Failed to communicate with IGFS: "
-            + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err);
-    }
-
-    /**
-     * Get delegate creating it if needed.
-     *
-     * @return Delegate.
-     */
-    private Delegate delegate() throws HadoopIgfsCommunicationException {
-        // These fields will contain possible exceptions from shmem and TCP endpoints.
-        Exception errShmem = null;
-        Exception errTcp = null;
-
-        // 1. If delegate is set, return it immediately.
-        Delegate curDelegate = delegateRef.get();
-
-        if (curDelegate != null)
-            return curDelegate;
-
-        // 2. Guess that we are in the same VM.
-        boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false);
-
-        if (!skipInProc) {
-            IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
-
-            if (igfs != null) {
-                HadoopIgfsEx hadoop = null;
-
-                try {
-                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
-
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof HadoopIgfsCommunicationException)
-                        if (hadoop != null)
-                            hadoop.close(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
-                }
-            }
-        }
-
-        // 3. Try connecting using shmem.
-        boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
-
-        if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
-
-                errShmem = e;
-            }
-        }
-
-        // 4. Try local TCP connection.
-        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
-
-        if (curDelegate == null && !skipLocTcp) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                    log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
-                        ", port=" + endpoint.port() + ']', e);
-
-                errTcp = e;
-            }
-        }
-
-        // 5. Try remote TCP connection.
-        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                    log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
-                        ", port=" + endpoint.port() + ']', e);
-
-                errTcp = e;
-            }
-        }
-
-        if (curDelegate != null) {
-            if (!delegateRef.compareAndSet(null, curDelegate))
-                curDelegate.doomed = true;
-
-            return curDelegate;
-        }
-        else {
-            SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=[");
-
-            if (errShmem != null)
-                errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
-
-            errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
-
-            errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
-                "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
-
-            throw new HadoopIgfsCommunicationException(errMsg.toString());
-        }
-    }
-
-    /**
-     * File system operation closure.
-     */
-    private static interface FileSystemClosure<T> {
-        /**
-         * Call closure body.
-         *
-         * @param hadoop RPC handler.
-         * @param hndResp Handshake response.
-         * @return Result.
-         * @throws IgniteCheckedException If failed.
-         * @throws IOException If failed.
-         */
-        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
-    }
-
-    /**
-     * Delegate.
-     */
-    private static class Delegate {
-        /** RPC handler. */
-        private final HadoopIgfsEx hadoop;
-
-        /** Handshake request. */
-        private final IgfsHandshakeResponse hndResp;
-
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-        /** Whether this delegate must be closed at the end of the next invocation. */
-        private boolean doomed;
-
-        /**
-         * Constructor.
-         *
-         * @param hadoop Hadoop.
-         * @param hndResp Handshake response.
-         */
-        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
-            this.hadoop = hadoop;
-            this.hndResp = hndResp;
-        }
-
-        /**
-         * Close underlying RPC handler.
-         *
-         * @param force Force flag.
-         */
-        private void close(boolean force) {
-            if (closeGuard.compareAndSet(false, true))
-                hadoop.close(force);
-        }
-    }
-
-    /**
-     * Helper method to find Igfs of the given name in the given Ignite instance.
-     *
-     * @param gridName The name of the grid to check.
-     * @param igfsName The name of Igfs.
-     * @return The file system instance, or null if not found.
-     */
-    private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) {
-        if (Ignition.state(gridName) == STARTED) {
-            try {
-                for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) {
-                    if (F.eq(fs.name(), igfsName))
-                        return (IgfsEx)fs;
-                }
-            }
-            catch (IgniteIllegalStateException ignore) {
-                // May happen if the grid state has changed:
-            }
-        }
-
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..0ab64d9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+    /** Counters. */
+    private final HadoopMapReduceCounters cntrs;
+
+    /** Group name. */
+    private final String name;
+
+    /**
+     * Creates new instance.
+     *
+     * @param cntrs Client counters instance.
+     * @param name Group name.
+     */
+    HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+        this.cntrs = cntrs;
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addCounter(Counter counter) {
+        addCounter(counter.getName(), counter.getDisplayName(), 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter addCounter(String name, String displayName, long value) {
+        final Counter counter = cntrs.findCounter(this.name, name);
+
+        counter.setValue(value);
+
+        return counter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, String displayName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, boolean create) {
+        return cntrs.findCounter(name, counterName, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return cntrs.groupSize(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+        for (final Counter counter : rightGroup)
+            cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<Counter> iterator() {
+        return cntrs.iterateGroup(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}
\ No newline at end of file


Mime
View raw message