ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [28/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (4).
Date Tue, 03 Mar 2015 13:08:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
new file mode 100644
index 0000000..e64b761
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * IGFS Hadoop stream descriptor.
+ */
+public class HadoopIgfsStreamDelegate {
+    /** RPC handler. */
+    private final HadoopIgfsEx hadoop;
+
+    /** Target. */
+    private final Object target;
+
+    /** Optional stream length. */
+    private final long len;
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
+        this(hadoop, target, -1);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     * @param len Optional length.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
+        assert hadoop != null;
+        assert target != null;
+
+        this.hadoop = hadoop;
+        this.target = target;
+        this.len = len;
+    }
+
+    /**
+     * @return RPC handler.
+     */
+    public HadoopIgfsEx hadoop() {
+        return hadoop;
+    }
+
+    /**
+     * @return Stream target.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T target() {
+        return (T) target;
+    }
+
+    /**
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return System.identityHashCode(target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
+            target == ((HadoopIgfsStreamDelegate)obj).target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsStreamDelegate.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
new file mode 100644
index 0000000..b3de523
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * IGFS input stream event listener.
+ */
+public interface HadoopIgfsStreamEventListener {
+    /**
+     * Callback invoked when the stream is being closed.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onClose() throws IgniteCheckedException;
+
+    /**
+     * Callback invoked when remote error occurs.
+     *
+     * @param errMsg Error message.
+     */
+    public void onError(String errMsg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
new file mode 100644
index 0000000..009443d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Utility constants and methods for IGFS Hadoop file system.
+ */
+public class HadoopIgfsUtils {
+    /** Parameter name for endpoint no embed mode flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
+
+    /** Parameter name for endpoint no shared memory flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
+
+    /** Parameter name for endpoint no local TCP flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
+
+    /**
+     * Get string parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return String value.
+     */
+    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
+        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Get integer parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Integer value.
+     * @throws IOException In case of parse exception.
+     */
+    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
+        String name0 = String.format(name, authority != null ? authority : "");
+
+        try {
+            return cfg.getInt(name0, dflt);
+        }
+        catch (NumberFormatException ignore) {
+            throw new IOException("Failed to parse parameter value to integer: " + name0);
+        }
+    }
+
+    /**
+     * Get boolean parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Boolean value.
+     */
+    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
+        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @return Casted exception.
+     */
+    public static IOException cast(IgniteCheckedException e) {
+        return cast(e, null);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @param path Path for exceptions.
+     * @return Casted exception.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
+        assert e != null;
+
+        // First check for any nested IOException; if exists - re-throw it.
+        if (e.hasCause(IOException.class))
+            return e.getCause(IOException.class);
+        else if (e.hasCause(IgfsFileNotFoundException.class))
+            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
+        else if (e.hasCause(IgfsParentNotDirectoryException.class))
+            return new ParentNotDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
+            return new PathIsNotEmptyDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
+            return new PathExistsException(path);
+        else
+            return new IOException(e);
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopIgfsUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java
new file mode 100644
index 0000000..94a4449
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsEndpoint.*;
+import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*;
+
+/**
+ * Wrapper for IGFS server.
+ */
+public class HadoopIgfsWrapper implements HadoopIgfs {
+    /** Delegate. */
+    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
+
+    /** Authority. */
+    private final String authority;
+
+    /** Connection string. */
+    private final HadoopIgfsEndpoint endpoint;
+
+    /** Log directory. */
+    private final String logDir;
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Logger. */
+    private final Log log;
+
+    /**
+     * Constructor.
+     *
+     * @param authority Authority (connection string).
+     * @param logDir Log directory for server.
+     * @param conf Configuration.
+     * @param log Current logger.
+     */
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+        try {
+            this.authority = authority;
+            this.endpoint = new HadoopIgfsEndpoint(authority);
+            this.logDir = logDir;
+            this.conf = conf;
+            this.log = log;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to parse endpoint: " + authority, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) {
+                return hndResp;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        Delegate delegate = delegateRef.get();
+
+        if (delegate != null && delegateRef.compareAndSet(delegate, null))
+            delegate.close(force);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.info(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.update(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.setTimes(path, accessTime, modificationTime);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.rename(src, dest);
+            }
+        }, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.delete(path, recursive);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
+        final long len) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
+            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.affinity(path, start, len);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.contentSummary(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.mkdirs(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
+            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listFiles(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
+            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listPaths(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.fsStatus();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path, seqReadsBeforePrefetch);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
+        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        @Nullable final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.append(path, create, props);
+            }
+        }, path);
+    }
+
+    /**
+     * Execute closure which is not path-specific.
+     *
+     * @param clo Closure.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
+        return withReconnectHandling(clo, null);
+    }
+
+    /**
+     * Execute closure.
+     *
+     * @param clo Closure.
+     * @param path Path for exceptions.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
+        throws IOException {
+        Exception err = null;
+
+        for (int i = 0; i < 2; i++) {
+            Delegate curDelegate = null;
+
+            boolean close = false;
+            boolean force = false;
+
+            try {
+                curDelegate = delegate();
+
+                assert curDelegate != null;
+
+                close = curDelegate.doomed;
+
+                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+            }
+            catch (HadoopIgfsCommunicationException e) {
+                if (curDelegate != null && !curDelegate.doomed) {
+                    // Try getting rid fo faulty delegate ASAP.
+                    delegateRef.compareAndSet(curDelegate, null);
+
+                    close = true;
+                    force = true;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to a server: " + e);
+
+                err = e;
+            }
+            catch (IgniteCheckedException e) {
+                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
+            }
+            finally {
+                if (close) {
+                    assert curDelegate != null;
+
+                    curDelegate.close(force);
+                }
+            }
+        }
+
+        throw new IOException("Failed to communicate with IGFS.", err);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private Delegate delegate() throws HadoopIgfsCommunicationException {
+        Exception err = null;
+
+        // 1. If delegate is set, return it immediately.
+        Delegate curDelegate = delegateRef.get();
+
+        if (curDelegate != null)
+            return curDelegate;
+
+        // 2. Guess that we are in the same VM.
+        if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false)) {
+            IgfsEx igfs = null;
+
+            if (endpoint.grid() == null) {
+                try {
+                    Ignite ignite = G.ignite();
+
+                    igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+            else {
+                for (Ignite ignite : G.allGrids()) {
+                    try {
+                        igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
+
+                        break;
+                    }
+                    catch (Exception e) {
+                        err = e;
+                    }
+                }
+            }
+
+            if (igfs != null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsInProc(igfs, log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 3. Try connecting using shmem.
+        if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
+            if (curDelegate == null && !U.isWindows()) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopOutProcIgfs(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local IGFS using shmem.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 4. Try local TCP connection.
+        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
+
+        if (!skipLocTcp) {
+            if (curDelegate == null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopOutProcIgfs(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                        log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local IGFS using TCP.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 5. Try remote TCP connection.
+        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopOutProcIgfs(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to out-proc remote IGFS using TCP.", e);
+
+                err = e;
+            }
+        }
+
+        if (curDelegate != null) {
+            if (!delegateRef.compareAndSet(null, curDelegate))
+                curDelegate.doomed = true;
+
+            return curDelegate;
+        }
+        else
+            throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint, err);
+    }
+
+    /**
+     * File system operation closure.
+     */
+    private static interface FileSystemClosure<T> {
+        /**
+         * Call closure body.
+         *
+         * @param hadoop RPC handler.
+         * @param hndResp Handshake response.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         * @throws IOException If failed.
+         */
+        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
+    }
+
+    /**
+     * Delegate.
+     */
+    private static class Delegate {
+        /** RPC handler. */
+        private final HadoopIgfsEx hadoop;
+
+        /** Handshake request. */
+        private final IgfsHandshakeResponse hndResp;
+
+        /** Close guard. */
+        private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+        /** Whether this delegate must be closed at the end of the next invocation. */
+        private boolean doomed;
+
+        /**
+         * Constructor.
+         *
+         * @param hadoop Hadoop.
+         * @param hndResp Handshake response.
+         */
+        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
+            this.hadoop = hadoop;
+            this.hndResp = hndResp;
+        }
+
+        /**
+         * Close underlying RPC handler.
+         *
+         * @param force Force flag.
+         */
+        private void close(boolean force) {
+            if (closeGuard.compareAndSet(false, true))
+                hadoop.close(force);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java
new file mode 100644
index 0000000..5a008bd
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * IGFS input stream wrapper for hadoop interfaces.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public final class HadoopInputIgfsStream extends InputStream implements Seekable, PositionedReadable,
+    HadoopIgfsStreamEventListener {
+    /** Minimum buffer size. */
+    private static final int MIN_BUF_SIZE = 4 * 1024;
+
+    /** Server stream delegate. */
+    private HadoopIgfsStreamDelegate delegate;
+
+    /** Stream ID used by logger. */
+    private long logStreamId;
+
+    /** Stream position. */
+    private long pos;
+
+    /** Stream read limit. */
+    private long limit;
+
+    /** Mark position. */
+    private long markPos = -1;
+
+    /** Prefetch buffer. */
+    private DoubleFetchBuffer buf = new DoubleFetchBuffer();
+
+    /** Buffer half size for double-buffering. */
+    private int bufHalfSize;
+
+    /** Closed flag. */
+    private volatile boolean closed;
+
+    /** Flag set if stream was closed due to connection breakage. */
+    private boolean connBroken;
+
+    /** Logger. */
+    private Log log;
+
+    /** Client logger. */
+    private IgfsLogger clientLog;
+
+    /** Read time. */
+    private long readTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of read bytes. */
+    private long total;
+
+    /**
+     * Creates input stream.
+     *
+     * @param delegate Server stream delegate.
+     * @param limit Read limit.
+     * @param bufSize Buffer size.
+     * @param log Log.
+     * @param clientLog Client logger.
+     */
+    public HadoopInputIgfsStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log,
+        IgfsLogger clientLog, long logStreamId) {
+        assert limit >= 0;
+
+        this.delegate = delegate;
+        this.limit = limit;
+        this.log = log;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
+
+        lastTs = System.nanoTime();
+
+        delegate.hadoop().addEventListener(delegate, this);
+    }
+
+    /**
+     * Read start.
+     */
+    private void readStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void readEnd() {
+        long now = System.nanoTime();
+
+        readTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        checkClosed();
+
+        readStart();
+
+        try {
+            if (eof())
+                return -1;
+
+            buf.refreshAhead(pos);
+
+            int res = buf.atPosition(pos);
+
+            pos++;
+            total++;
+
+            buf.refreshAhead(pos);
+
+            return res;
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
+        checkClosed();
+
+        if (eof())
+            return -1;
+
+        readStart();
+
+        try {
+            long remaining = limit - pos;
+
+            int read = buf.flatten(b, pos, off, len);
+
+            pos += read;
+            total += read;
+            remaining -= read;
+
+            if (remaining > 0 && read != len) {
+                int readAmt = (int)Math.min(remaining, len - read);
+
+                delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get();
+
+                read += readAmt;
+                pos += readAmt;
+                total += readAmt;
+            }
+
+            buf.refreshAhead(pos);
+
+            return read;
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSkip(logStreamId, n);
+
+        long oldPos = pos;
+
+        if (pos + n <= limit)
+            pos += n;
+        else
+            pos = limit;
+
+        buf.refreshAhead(pos);
+
+        return pos - oldPos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        checkClosed();
+
+        int available = buf.available(pos);
+
+        assert available >= 0;
+
+        return available;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            readStart();
+
+            if (log.isDebugEnabled())
+                log.debug("Closing input stream: " + delegate);
+
+            delegate.hadoop().closeStream(delegate);
+
+            readEnd();
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+
+            markClosed(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime +
+                    ", userTime=" + userTime + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void mark(int readLimit) {
+        markPos = pos;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logMark(logStreamId, readLimit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void reset() throws IOException {
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logReset(logStreamId);
+
+        if (markPos == -1)
+            throw new IOException("Stream was not marked.");
+
+        pos = markPos;
+
+        buf.refreshAhead(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean markSupported() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException {
+        long remaining = limit - position;
+
+        int read = (int)Math.min(len, remaining);
+
+        // Return -1 at EOF.
+        if (read == 0)
+            return -1;
+
+        readFully(position, buf, off, read);
+
+        return read;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException {
+        long remaining = limit - position;
+
+        checkClosed();
+
+        if (len > remaining)
+            throw new EOFException("End of stream reached before data was fully read.");
+
+        readStart();
+
+        try {
+            int read = this.buf.flatten(buf, position, off, len);
+
+            total += read;
+
+            if (read != len) {
+                int readAmt = len - read;
+
+                delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get();
+
+                total += readAmt;
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logRandomRead(logStreamId, position, len);
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(long position, byte[] buf) throws IOException {
+        readFully(position, buf, 0, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        A.ensure(pos >= 0, "position must be non-negative");
+
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSeek(logStreamId, pos);
+
+        if (pos > limit)
+            pos = limit;
+
+        if (log.isDebugEnabled())
+            log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']');
+
+        this.pos = pos;
+
+        buf.refreshAhead(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long getPos() {
+        return pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean seekToNewSource(long targetPos) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        markClosed(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError(String errMsg) {
+        // No-op.
+    }
+
+    /**
+     * Marks stream as closed.
+     *
+     * @param connBroken {@code True} if connection with server was lost.
+     */
+    private void markClosed(boolean connBroken) {
+        // It is ok to have race here.
+        if (!closed) {
+            closed = true;
+
+            this.connBroken = connBroken;
+
+            delegate.hadoop().removeEventListener(delegate);
+        }
+    }
+
+    /**
+     * @throws IOException If check failed.
+     */
+    private void checkClosed() throws IOException {
+        if (closed) {
+            if (connBroken)
+                throw new IOException("Server connection was lost.");
+            else
+                throw new IOException("Stream is closed.");
+        }
+    }
+
+    /**
+     * @return {@code True} if end of stream reached.
+     */
+    private boolean eof() {
+        return limit == pos;
+    }
+
+    /**
+     * Asynchronous prefetch buffer.
+     */
+    private static class FetchBufferPart {
+        /** Read future. */
+        private GridPlainFuture<byte[]> readFut;
+
+        /** Position of cached chunk in file. */
+        private long pos;
+
+        /** Prefetch length. Need to store as read future result might be not available yet. */
+        private int len;
+
+        /**
+         * Creates fetch buffer part.
+         *
+         * @param readFut Read future for this buffer.
+         * @param pos Read position.
+         * @param len Chunk length.
+         */
+        private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) {
+            this.readFut = readFut;
+            this.pos = pos;
+            this.len = len;
+        }
+
+        /**
+         * Copies cached data if specified position matches cached region.
+         *
+         * @param dst Destination buffer.
+         * @param pos Read position in file.
+         * @param dstOff Offset in destination buffer from which start writing.
+         * @param len Maximum number of bytes to copy.
+         * @return Number of bytes copied.
+         * @throws IgniteCheckedException If read future failed.
+         */
+        public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
+            // If read start position is within cached boundaries.
+            if (contains(pos)) {
+                byte[] data = readFut.get();
+
+                int srcPos = (int)(pos - this.pos);
+                int cpLen = Math.min(len, data.length - srcPos);
+
+                U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
+
+                return cpLen;
+            }
+
+            return 0;
+        }
+
+        /**
+         * @return {@code True} if data is ready to be read.
+         */
+        public boolean ready() {
+            return readFut.isDone();
+        }
+
+        /**
+         * Checks if current buffer part contains given position.
+         *
+         * @param pos Position to check.
+         * @return {@code True} if position matches buffer region.
+         */
+        public boolean contains(long pos) {
+            return this.pos <= pos && this.pos + len > pos;
+        }
+    }
+
+    private class DoubleFetchBuffer {
+        /**  */
+        private FetchBufferPart first;
+
+        /** */
+        private FetchBufferPart second;
+
+        /**
+         * Copies fetched data from both buffers to destination array if cached region matched read position.
+         *
+         * @param dst Destination buffer.
+         * @param pos Read position in file.
+         * @param dstOff Destination buffer offset.
+         * @param len Maximum number of bytes to copy.
+         * @return Number of bytes copied.
+         * @throws IgniteCheckedException If any read operation failed.
+         */
+        public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
+            assert dstOff >= 0;
+            assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff +
+                ", len=" + len + ']';
+
+            int bytesCopied = 0;
+
+            if (first != null) {
+                bytesCopied += first.flatten(dst, pos, dstOff, len);
+
+                if (bytesCopied != len && second != null) {
+                    assert second.pos == first.pos + first.len;
+
+                    bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied);
+                }
+            }
+
+            return bytesCopied;
+        }
+
+        /**
+         * Gets byte at specified position in buffer.
+         *
+         * @param pos Stream position.
+         * @return Read byte.
+         * @throws IgniteCheckedException If read failed.
+         */
+        public int atPosition(long pos) throws IgniteCheckedException {
+            // Should not reach here if stream contains no data.
+            assert first != null;
+
+            if (first.contains(pos)) {
+                byte[] bytes = first.readFut.get();
+
+                return bytes[((int)(pos - first.pos))] & 0xFF;
+            }
+            else {
+                assert second != null;
+                assert second.contains(pos);
+
+                byte[] bytes = second.readFut.get();
+
+                return bytes[((int)(pos - second.pos))] & 0xFF;
+            }
+        }
+
+        /**
+         * Starts asynchronous buffer refresh if needed, depending on current position.
+         *
+         * @param pos Current stream position.
+         */
+        public void refreshAhead(long pos) {
+            if (fullPrefetch(pos)) {
+                first = fetch(pos, bufHalfSize);
+                second = fetch(pos + bufHalfSize, bufHalfSize);
+            }
+            else if (needFlip(pos)) {
+                first = second;
+
+                second = fetch(first.pos + first.len, bufHalfSize);
+            }
+        }
+
+        /**
+         * @param pos Position from which read is expected.
+         * @return Number of bytes available to be read without blocking.
+         */
+        public int available(long pos) {
+            int available = 0;
+
+            if (first != null) {
+                if (first.contains(pos)) {
+                    if (first.ready()) {
+                        available += (pos - first.pos);
+
+                        if (second != null && second.ready())
+                            available += second.len;
+                    }
+                }
+                else {
+                    if (second != null && second.contains(pos) && second.ready())
+                        available += (pos - second.pos);
+                }
+            }
+
+            return available;
+        }
+
+        /**
+         * Checks if position shifted enough to forget previous buffer.
+         *
+         * @param pos Current position.
+         * @return {@code True} if need flip buffers.
+         */
+        private boolean needFlip(long pos) {
+            // Return true if we read more then half of second buffer.
+            return second != null && second.contains(pos);
+        }
+
+        /**
+         * Determines if all cached bytes should be discarded and new region should be
+         * prefetched.
+         *
+         * @param curPos Current stream position.
+         * @return {@code True} if need to refresh both blocks.
+         */
+        private boolean fullPrefetch(long curPos) {
+            // If no data was prefetched yet, return true.
+            return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len);
+        }
+
+        /**
+         * Starts asynchronous fetch for given region.
+         *
+         * @param pos Position to read from.
+         * @param size Number of bytes to read.
+         * @return Fetch buffer part.
+         */
+        private FetchBufferPart fetch(long pos, int size) {
+            long remaining = limit - pos;
+
+            size = (int)Math.min(size, remaining);
+
+            return size <= 0 ? null :
+                new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIpcIgfsIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIpcIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIpcIgfsIo.java
new file mode 100644
index 0000000..4de4d53
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIpcIgfsIo.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * IO layer implementation based on blocking IPC streams.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public class HadoopIpcIgfsIo implements HadoopIgfsIo {
+    /** Logger. */
+    private Log log;
+
+    /** Request futures map. */
+    private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
+        new ConcurrentHashMap8<>();
+
+    /** Request ID counter. */
+    private AtomicLong reqIdCnt = new AtomicLong();
+
+    /** Endpoint. */
+    private IpcEndpoint endpoint;
+
+    /** Endpoint output stream. */
+    private IgfsDataOutputStream out;
+
+    /** Protocol. */
+    private final IgfsMarshaller marsh;
+
+    /** Client reader thread. */
+    private Thread reader;
+
+    /** Lock for graceful shutdown. */
+    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
+
+    /** Stopping flag. */
+    private volatile boolean stopping;
+
+    /** Server endpoint address. */
+    private final String endpointAddr;
+
+    /** Number of open file system sessions. */
+    private final AtomicInteger activeCnt = new AtomicInteger(1);
+
+    /** Event listeners. */
+    private final Collection<HadoopIgfsIpcIoListener> lsnrs =
+        new GridConcurrentHashSet<>();
+
+    /** Cached connections. */
+    private static final ConcurrentMap<String, HadoopIpcIgfsIo> ipcCache =
+        new ConcurrentHashMap8<>();
+
+    /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */
+    private static final GridStripedLock initLock = new GridStripedLock(32);
+
+    /**
+     * @param endpointAddr Endpoint.
+     * @param marsh Protocol.
+     * @param log Logger to use.
+     */
+    public HadoopIpcIgfsIo(String endpointAddr, IgfsMarshaller marsh, Log log) {
+        assert endpointAddr != null;
+        assert marsh != null;
+
+        this.endpointAddr = endpointAddr;
+        this.marsh = marsh;
+        this.log = log;
+    }
+
+    /**
+     * Returns a started and valid instance of this class
+     * for a given endpoint.
+     *
+     * @param log Logger to use for new instance.
+     * @param endpoint Endpoint string.
+     * @return New or existing cached instance, which is started and operational.
+     * @throws IOException If new instance was created but failed to start.
+     */
+    public static HadoopIpcIgfsIo get(Log log, String endpoint) throws IOException {
+        while (true) {
+            HadoopIpcIgfsIo clientIo = ipcCache.get(endpoint);
+
+            if (clientIo != null) {
+                if (clientIo.acquire())
+                    return clientIo;
+                else
+                    // If concurrent close.
+                    ipcCache.remove(endpoint, clientIo);
+            }
+            else {
+                Lock lock = initLock.getLock(endpoint);
+
+                lock.lock();
+
+                try {
+                    clientIo = ipcCache.get(endpoint);
+
+                    if (clientIo != null) { // Perform double check.
+                        if (clientIo.acquire())
+                            return clientIo;
+                        else
+                            // If concurrent close.
+                            ipcCache.remove(endpoint, clientIo);
+                    }
+
+                    // Otherwise try creating a new one.
+                    clientIo = new HadoopIpcIgfsIo(endpoint, new IgfsMarshaller(), log);
+
+                    try {
+                        clientIo.start();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IOException(e.getMessage(), e);
+                    }
+
+                    HadoopIpcIgfsIo old = ipcCache.putIfAbsent(endpoint, clientIo);
+
+                    // Put in exclusive lock.
+                    assert old == null;
+
+                    return clientIo;
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Increases usage count for this instance.
+     *
+     * @return {@code true} if usage count is greater than zero.
+     */
+    private boolean acquire() {
+        while (true) {
+            int cnt = activeCnt.get();
+
+            if (cnt == 0) {
+                if (log.isDebugEnabled())
+                    log.debug("IPC IO not acquired (count was 0): " + this);
+
+                return false;
+            }
+
+            // Need to make sure that no-one decremented count in between.
+            if (activeCnt.compareAndSet(cnt, cnt + 1)) {
+                if (log.isDebugEnabled())
+                    log.debug("IPC IO acquired: " + this);
+
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Releases this instance, decrementing usage count.
+     * <p>
+     * If usage count becomes zero, the instance is stopped
+     * and removed from cache.
+     */
+    public void release() {
+        while (true) {
+            int cnt = activeCnt.get();
+
+            if (cnt == 0) {
+                if (log.isDebugEnabled())
+                    log.debug("IPC IO not released (count was 0): " + this);
+
+                return;
+            }
+
+            if (activeCnt.compareAndSet(cnt, cnt - 1)) {
+                if (cnt == 1) {
+                    ipcCache.remove(endpointAddr, this);
+
+                    if (log.isDebugEnabled())
+                        log.debug("IPC IO stopping as unused: " + this);
+
+                    stop();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("IPC IO released: " + this);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Closes this IO instance, removing it from cache.
+     */
+    public void forceClose() {
+        if (ipcCache.remove(endpointAddr, this))
+            stop();
+    }
+
+    /**
+     * Starts the IO.
+     *
+     * @throws IgniteCheckedException If failed to connect the endpoint.
+     */
+    private void start() throws IgniteCheckedException {
+        boolean success = false;
+
+        try {
+            endpoint = IpcEndpointFactory.connectEndpoint(
+                    endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, ""));
+
+            out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
+
+            reader = new ReaderThread();
+
+            // Required for Hadoop 2.x
+            reader.setDaemon(true);
+
+            reader.start();
+
+            success = true;
+        }
+        catch (IgniteCheckedException e) {
+            IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class);
+
+            if (resEx != null)
+                throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx);
+
+            throw e;
+        }
+        finally {
+            if (!success)
+                stop();
+        }
+    }
+
+    /**
+     * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed.
+     * Close listeners will be invoked as if connection is closed by server.
+     */
+    private void stop() {
+        close0(null);
+
+        if (reader != null) {
+            try {
+                U.interrupt(reader);
+                U.join(reader);
+
+                reader = null;
+            }
+            catch (IgniteInterruptedCheckedException ignored) {
+                Thread.currentThread().interrupt();
+
+                log.warn("Got interrupted while waiting for reader thread to shut down (will return).");
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) {
+        if (!busyLock.readLock().tryLock()) {
+            lsnr.onClose();
+
+            return;
+        }
+
+        boolean invokeNow = false;
+
+        try {
+            invokeNow = stopping;
+
+            if (!invokeNow)
+                lsnrs.add(lsnr);
+        }
+        finally {
+            busyLock.readLock().unlock();
+
+            if (invokeNow)
+                lsnr.onClose();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) {
+        lsnrs.remove(lsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException {
+        return send(msg, null, 0, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff,
+        int outLen) throws IgniteCheckedException {
+        assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK;
+
+        if (!busyLock.readLock().tryLock())
+            throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
+                "closed).");
+
+        try {
+            if (stopping)
+                throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
+                    "closed).");
+
+            long reqId = reqIdCnt.getAndIncrement();
+
+            HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>();
+
+            fut.outputBuffer(outBuf);
+            fut.outputOffset(outOff);
+            fut.outputLength(outLen);
+            fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK);
+
+            HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut);
+
+            assert oldFut == null;
+
+            if (log.isDebugEnabled())
+                log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']');
+
+            byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command());
+
+            IgniteCheckedException err = null;
+
+            try {
+                synchronized (this) {
+                    marsh.marshall(msg, hdr, out);
+
+                    out.flush(); // Blocking operation + sometimes system call.
+                }
+            }
+            catch (IgniteCheckedException e) {
+                err = e;
+            }
+            catch (IOException e) {
+                err = new HadoopIgfsCommunicationException(e);
+            }
+
+            if (err != null) {
+                reqMap.remove(reqId, fut);
+
+                fut.onDone(err);
+            }
+
+            return fut;
+        }
+        finally {
+            busyLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException {
+        if (!busyLock.readLock().tryLock())
+            throw new HadoopIgfsCommunicationException("Failed to send message (client is being " +
+                "concurrently closed).");
+
+        try {
+            if (stopping)
+                throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed).");
+
+            assert msg.command() == IgfsIpcCommand.WRITE_BLOCK;
+
+            IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
+
+            byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK);
+
+            U.longToBytes(req.streamId(), hdr, 12);
+            U.intToBytes(req.length(), hdr, 20);
+
+            synchronized (this) {
+                out.write(hdr);
+                out.write(req.data(), (int)req.position(), req.length());
+
+                out.flush();
+            }
+        }
+        catch (IOException e) {
+            throw new HadoopIgfsCommunicationException(e);
+        }
+        finally {
+            busyLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Closes client but does not wait.
+     *
+     * @param err Error.
+     */
+    private void close0(@Nullable Throwable err) {
+        busyLock.writeLock().lock();
+
+        try {
+            if (stopping)
+                return;
+
+            stopping = true;
+        }
+        finally {
+            busyLock.writeLock().unlock();
+        }
+
+        if (err == null)
+            err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " +
+                "is received).");
+
+        // Clean up resources.
+        U.closeQuiet(out);
+
+        if (endpoint != null)
+            endpoint.close();
+
+        // Unwind futures. We can safely iterate here because no more futures will be added.
+        Iterator<HadoopIgfsFuture> it = reqMap.values().iterator();
+
+        while (it.hasNext()) {
+            HadoopIgfsFuture fut = it.next();
+
+            fut.onDone(err);
+
+            it.remove();
+        }
+
+        for (HadoopIgfsIpcIoListener lsnr : lsnrs)
+            lsnr.onClose();
+    }
+
+    /**
+     * Do not extend {@code GridThread} to minimize class dependencies.
+     */
+    private class ReaderThread extends Thread {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void run() {
+            // Error to fail pending futures.
+            Throwable err = null;
+
+            try {
+                InputStream in = endpoint.inputStream();
+
+                IgfsDataInputStream dis = new IgfsDataInputStream(in);
+
+                byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
+                byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE];
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    dis.readFully(hdr);
+
+                    long reqId = U.bytesToLong(hdr, 0);
+
+                    // We don't wait for write responses, therefore reqId is -1.
+                    if (reqId == -1) {
+                        // We received a response which normally should not be sent. It must contain an error.
+                        dis.readFully(msgHdr);
+
+                        assert msgHdr[4] != 0;
+
+                        String errMsg = dis.readUTF();
+
+                        // Error code.
+                        dis.readInt();
+
+                        long streamId = dis.readLong();
+
+                        for (HadoopIgfsIpcIoListener lsnr : lsnrs)
+                            lsnr.onError(streamId, errMsg);
+                    }
+                    else {
+                        HadoopIgfsFuture<Object> fut = reqMap.remove(reqId);
+
+                        if (fut == null) {
+                            String msg = "Failed to read response from server: response closure is unavailable for " +
+                                "requestId (will close connection):" + reqId;
+
+                            log.warn(msg);
+
+                            err = new IgniteCheckedException(msg);
+
+                            break;
+                        }
+                        else {
+                            try {
+                                IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']');
+
+                                Object res = null;
+
+                                if (fut.read()) {
+                                    dis.readFully(msgHdr);
+
+                                    boolean hasErr = msgHdr[4] != 0;
+
+                                    if (hasErr) {
+                                        String errMsg = dis.readUTF();
+
+                                        // Error code.
+                                        Integer errCode = dis.readInt();
+
+                                        IgfsControlResponse.throwError(errCode, errMsg);
+                                    }
+
+                                    int blockLen = U.bytesToInt(msgHdr, 5);
+
+                                    int readLen = Math.min(blockLen, fut.outputLength());
+
+                                    if (readLen > 0) {
+                                        assert fut.outputBuffer() != null;
+
+                                        dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen);
+                                    }
+
+                                    if (readLen != blockLen) {
+                                        byte[] buf = new byte[blockLen - readLen];
+
+                                        dis.readFully(buf);
+
+                                        res = buf;
+                                    }
+                                }
+                                else
+                                    res = marsh.unmarshall(cmd, hdr, dis);
+
+                                fut.onDone(res);
+                            }
+                            catch (IgniteCheckedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to apply response closure (will fail request future): " +
+                                        e.getMessage());
+
+                                fut.onDone(e);
+
+                                err = e;
+                            }
+                        }
+                    }
+                }
+            }
+            catch (EOFException ignored) {
+                err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer).");
+            }
+            catch (IOException e) {
+                if (!stopping)
+                    log.error("Failed to read data (connection will be closed)", e);
+
+                err = new HadoopIgfsCommunicationException(e);
+            }
+            catch (IgniteCheckedException e) {
+                if (!stopping)
+                    log.error("Failed to obtain endpoint input stream (connection will be closed)", e);
+
+                err = e;
+            }
+            finally {
+                close0(err);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt +
+            ", stopping=" + stopping + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutProcIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutProcIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutProcIgfs.java
new file mode 100644
index 0000000..19205c0
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopOutProcIgfs.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*;
+
+/**
+ * Communication with external process (TCP or shmem).
+ */
+public class HadoopOutProcIgfs implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
+    /** Expected result is boolean. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
+
+    /** Expected result is boolean. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
+
+    /** Expected result is {@code IgfsHandshakeResponse} */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+        IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
+
+    /** Expected result is {@code IgfsStatus} */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
+        createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+        IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+        Collection<IgfsFile>> FILE_COL_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+        Collection<IgfsPath>> PATH_COL_RES = createClosure();
+
+    /** Expected result is {@code IgfsPathSummary}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
+        createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
+        Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
+
+    /** Grid name. */
+    private final String grid;
+
+    /** IGFS name. */
+    private final String igfs;
+
+    /** Client log. */
+    private final Log log;
+
+    /** Client IO. */
+    private final HadoopIpcIgfsIo io;
+
+    /** Event listeners. */
+    private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
+
+    /**
+     * Constructor for TCP endpoint.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    public HadoopOutProcIgfs(String host, int port, String grid, String igfs, Log log) throws IOException {
+        this(host, port, grid, igfs, false, log);
+    }
+
+    /**
+     * Constructor for shmem endpoint.
+     *
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    public HadoopOutProcIgfs(int port, String grid, String igfs, Log log) throws IOException {
+        this(null, port, grid, igfs, true, log);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param shmem Shared memory flag.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    private HadoopOutProcIgfs(String host, int port, String grid, String igfs, boolean shmem, Log log)
+        throws IOException {
+        assert host != null && !shmem || host == null && shmem :
+            "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
+
+        String endpoint = host != null ? host + ":" + port : "shmem:" + port;
+
+        this.grid = grid;
+        this.igfs = igfs;
+        this.log = log;
+
+        io = HadoopIpcIgfsIo.get(log, endpoint);
+
+        io.addEventListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
+        final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
+
+        req.gridName(grid);
+        req.igfsName(igfs);
+        req.logDirectory(logDir);
+
+        return io.send(req).chain(HANDSHAKE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        assert io != null;
+
+        io.removeEventListener(this);
+
+        if (force)
+            io.forceClose();
+        else
+            io.release();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(INFO);
+        msg.path(path);
+
+        return io.send(msg).chain(FILE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(UPDATE);
+        msg.path(path);
+        msg.properties(props);
+
+        return io.send(msg).chain(FILE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(SET_TIMES);
+        msg.path(path);
+        msg.accessTime(accessTime);
+        msg.modificationTime(modificationTime);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(RENAME);
+        msg.path(src);
+        msg.destinationPath(dest);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(DELETE);
+        msg.path(path);
+        msg.flag(recursive);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+        throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(AFFINITY);
+        msg.path(path);
+        msg.start(start);
+        msg.length(len);
+
+        return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(PATH_SUMMARY);
+        msg.path(path);
+
+        return io.send(msg).chain(SUMMARY_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(MAKE_DIRECTORIES);
+        msg.path(path);
+        msg.properties(props);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(LIST_FILES);
+        msg.path(path);
+
+        return io.send(msg).chain(FILE_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(LIST_PATHS);
+        msg.path(path);
+
+        return io.send(msg).chain(PATH_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+        return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_READ);
+        msg.path(path);
+        msg.flag(false);
+
+        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
+        int seqReadsBeforePrefetch) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_READ);
+        msg.path(path);
+        msg.flag(true);
+        msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+
+        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_CREATE);
+        msg.path(path);
+        msg.flag(overwrite);
+        msg.colocate(colocate);
+        msg.properties(props);
+        msg.replication(replication);
+        msg.blockSize(blockSize);
+
+        Long streamId = io.send(msg).chain(LONG_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, streamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_APPEND);
+        msg.path(path);
+        msg.flag(create);
+        msg.properties(props);
+
+        Long streamId = io.send(msg).chain(LONG_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, streamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
+        final @Nullable byte[] outBuf, final int outOff, final int outLen) {
+        assert len > 0;
+
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(READ_BLOCK);
+        msg.streamId((long) desc.target());
+        msg.position(pos);
+        msg.length(len);
+
+        try {
+            return io.send(msg, outBuf, outOff, outLen);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridPlainFutureAdapter<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
+        throws IOException {
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(WRITE_BLOCK);
+        msg.streamId((long) desc.target());
+        msg.data(data);
+        msg.position(off);
+        msg.length(len);
+
+        try {
+            io.sendPlain(msg);
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(CLOSE);
+        msg.streamId((long)desc.target());
+
+        try {
+            io.send(msg).chain(BOOL_RES).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
+        HadoopIgfsStreamEventListener lsnr) {
+        long streamId = desc.target();
+
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
+
+        assert lsnr0 == null || lsnr0 == lsnr;
+
+        if (log.isDebugEnabled())
+            log.debug("Added stream event listener [streamId=" + streamId + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
+        long streamId = desc.target();
+
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
+
+        if (lsnr0 != null && log.isDebugEnabled())
+            log.debug("Removed stream event listener [streamId=" + streamId + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+            try {
+                lsnr.onClose();
+            }
+            catch (IgniteCheckedException e) {
+                log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError(long streamId, String errMsg) {
+        HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
+
+        if (lsnr != null)
+            lsnr.onError(errMsg);
+        else
+            log.warn("Received write error response for not registered output stream (will ignore) " +
+                "[streamId= " + streamId + ']');
+    }
+
+    /**
+     * Creates conversion closure for given type.
+     *
+     * @param <T> Type of expected result.
+     * @return Conversion closure.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() {
+        return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() {
+            @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException {
+                IgfsControlResponse res = (IgfsControlResponse)fut.get();
+
+                if (res.hasError())
+                    res.throwError();
+
+                return (T)res.response();
+            }
+        };
+    }
+}


Mime
View raw message