ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [29/79] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Fri, 06 Mar 2015 06:48:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
new file mode 100644
index 0000000..54e87dd
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
+
+/**
+ * Hadoop file system properties.
+ */
+public class HadoopIgfsProperties {
+    /** Username. */
+    private String usrName;
+
+    /** Group name. */
+    private String grpName;
+
+    /** Permissions. */
+    private FsPermission perm;
+
+    /**
+     * Constructor.
+     *
+     * @param props Properties.
+     * @throws IgniteException In case of error.
+     */
+    public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
+        usrName = props.get(PROP_USER_NAME);
+        grpName = props.get(PROP_GROUP_NAME);
+
+        String permStr = props.get(PROP_PERMISSION);
+
+        if (permStr != null) {
+            try {
+                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
+            }
+            catch (NumberFormatException ignore) {
+                throw new IgniteException("Permissions cannot be parsed: " + permStr);
+            }
+        }
+    }
+
+    /**
+     * Get user name.
+     *
+     * @return User name.
+     */
+    public String userName() {
+        return usrName;
+    }
+
+    /**
+     * Get group name.
+     *
+     * @return Group name.
+     */
+    public String groupName() {
+        return grpName;
+    }
+
+    /**
+     * Get permission.
+     *
+     * @return Permission.
+     */
+    public FsPermission permission() {
+        return perm;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
new file mode 100644
index 0000000..4530e64
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system input stream wrapper.
+ */
+public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
+    /** Actual input stream to the secondary file system. */
+    private final FSDataInputStream is;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long readTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of read bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param is Actual input stream to the secondary file system.
+     * @param clientLog Client log.
+     */
+    public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
+        assert is != null;
+        assert clientLog != null;
+
+        this.is = is;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(b);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = super.read(b, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        readStart();
+
+        long res;
+
+        try {
+            res =  is.skip(n);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSkip(logStreamId, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        readStart();
+
+        try {
+            return is.available();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            readStart();
+
+            try {
+                is.close();
+            }
+            finally {
+                readEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void mark(int readLimit) {
+        readStart();
+
+        try {
+            is.mark(readLimit);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logMark(logStreamId, readLimit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void reset() throws IOException {
+        readStart();
+
+        try {
+            is.reset();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logReset(logStreamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean markSupported() {
+        readStart();
+
+        try {
+            return is.markSupported();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total++;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += len;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += buf.length;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        readStart();
+
+        try {
+            is.seek(pos);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSeek(logStreamId, pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long getPos() throws IOException {
+        readStart();
+
+        try {
+            return is.getPos();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+        readStart();
+
+        try {
+            return is.seekToNewSource(targetPos);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void readStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void readEnd() {
+        long now = System.nanoTime();
+
+        readTime += now - lastTs;
+
+        lastTs = now;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
new file mode 100644
index 0000000..9ab552e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system output stream wrapper.
+ */
+public class HadoopIgfsProxyOutputStream extends OutputStream {
+    /** Actual output stream. */
+    private FSDataOutputStream os;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long writeTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of written bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param os Actual output stream.
+     * @param clientLog Client logger.
+     * @param logStreamId Log stream ID.
+     */
+    public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
+        assert os != null;
+        assert clientLog != null;
+
+        this.os = os;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(int b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += b.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b, off, len);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void flush() throws IOException {
+        writeStart();
+
+        try {
+            os.flush();
+        }
+        finally {
+            writeEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            writeStart();
+
+            try {
+                os.close();
+            }
+            finally {
+                writeEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void writeStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void writeEnd() {
+        long now = System.nanoTime();
+
+        writeTime += now - lastTs;
+
+        lastTs = now;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
new file mode 100644
index 0000000..e921fd3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.igfs.secondary.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
+ * requested.
+ * <p>
+ * The class is expected to be used only from synchronized context and therefore is not tread-safe.
+ */
+public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable {
+    /** Secondary file system. */
+    private final FileSystem fs;
+
+    /** Path to the file to open. */
+    private final Path path;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Actual input stream. */
+    private FSDataInputStream in;
+
+    /** Cached error occurred during output stream open. */
+    private IOException err;
+
+    /** Flag indicating that the stream was already opened. */
+    private boolean opened;
+
+    /**
+     * Constructor.
+     *
+     * @param fs Secondary file system.
+     * @param path Path to the file to open.
+     * @param bufSize Buffer size.
+     */
+    public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) {
+        assert fs != null;
+        assert path != null;
+
+        this.fs = fs;
+        this.path = path;
+        this.bufSize = bufSize;
+    }
+
+    /** Get input stream. */
+    private PositionedReadable in() throws IOException {
+        if (opened) {
+            if (err != null)
+                throw err;
+        }
+        else {
+            opened = true;
+
+            try {
+                in = fs.open(path, bufSize);
+
+                if (in == null)
+                    throw new IOException("Failed to open input stream (file system returned null): " + path);
+            }
+            catch (IOException e) {
+                err = e;
+
+                throw err;
+            }
+        }
+
+        return in;
+    }
+
+    /**
+     * Close wrapped input stream in case it was previously opened.
+     */
+    @Override public void close() {
+        U.closeQuiet(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+        return in().read(pos, buf, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
new file mode 100644
index 0000000..54f7377
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * IGFS Hadoop stream descriptor.
+ */
+public class HadoopIgfsStreamDelegate {
+    /** RPC handler. */
+    private final HadoopIgfsEx hadoop;
+
+    /** Target. */
+    private final Object target;
+
+    /** Optional stream length. */
+    private final long len;
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
+        this(hadoop, target, -1);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     * @param len Optional length.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
+        assert hadoop != null;
+        assert target != null;
+
+        this.hadoop = hadoop;
+        this.target = target;
+        this.len = len;
+    }
+
+    /**
+     * @return RPC handler.
+     */
+    public HadoopIgfsEx hadoop() {
+        return hadoop;
+    }
+
+    /**
+     * @return Stream target.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T target() {
+        return (T) target;
+    }
+
+    /**
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return System.identityHashCode(target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
+            target == ((HadoopIgfsStreamDelegate)obj).target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsStreamDelegate.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
new file mode 100644
index 0000000..6b3fa82
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+
+/**
+ * IGFS input stream event listener.
+ */
+public interface HadoopIgfsStreamEventListener {
+    /**
+     * Callback invoked when the stream is being closed.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onClose() throws IgniteCheckedException;
+
+    /**
+     * Callback invoked when remote error occurs.
+     *
+     * @param errMsg Error message.
+     */
+    public void onError(String errMsg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
new file mode 100644
index 0000000..e30a4ec
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Utility constants and methods for IGFS Hadoop file system.
+ */
+public class HadoopIgfsUtils {
+    /** Parameter name for endpoint no embed mode flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
+
+    /** Parameter name for endpoint no shared memory flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
+
+    /** Parameter name for endpoint no local TCP flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
+
+    /**
+     * Get string parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return String value.
+     */
+    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
+        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Get integer parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Integer value.
+     * @throws IOException In case of parse exception.
+     */
+    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
+        String name0 = String.format(name, authority != null ? authority : "");
+
+        try {
+            return cfg.getInt(name0, dflt);
+        }
+        catch (NumberFormatException ignore) {
+            throw new IOException("Failed to parse parameter value to integer: " + name0);
+        }
+    }
+
+    /**
+     * Get boolean parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Boolean value.
+     */
+    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
+        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @return Casted exception.
+     */
+    public static IOException cast(IgniteCheckedException e) {
+        return cast(e, null);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @param path Path for exceptions.
+     * @return Casted exception.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
+        assert e != null;
+
+        // First check for any nested IOException; if exists - re-throw it.
+        if (e.hasCause(IOException.class))
+            return e.getCause(IOException.class);
+        else if (e.hasCause(IgfsFileNotFoundException.class))
+            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
+        else if (e.hasCause(IgfsParentNotDirectoryException.class))
+            return new ParentNotDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
+            return new PathIsNotEmptyDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
+            return new PathExistsException(path);
+        else
+            return new IOException(e);
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopIgfsUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
new file mode 100644
index 0000000..1dada21
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.*;
+import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
+
+/**
+ * Wrapper for IGFS server.
+ */
+public class HadoopIgfsWrapper implements HadoopIgfs {
+    /** Delegate. */
+    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
+
+    /** Authority. */
+    private final String authority;
+
+    /** Connection string. */
+    private final HadoopIgfsEndpoint endpoint;
+
+    /** Log directory. */
+    private final String logDir;
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Logger. */
+    private final Log log;
+
+    /**
+     * Constructor.
+     *
+     * @param authority Authority (connection string).
+     * @param logDir Log directory for server.
+     * @param conf Configuration.
+     * @param log Current logger.
+     */
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+        try {
+            this.authority = authority;
+            this.endpoint = new HadoopIgfsEndpoint(authority);
+            this.logDir = logDir;
+            this.conf = conf;
+            this.log = log;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to parse endpoint: " + authority, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) {
+                return hndResp;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        Delegate delegate = delegateRef.get();
+
+        if (delegate != null && delegateRef.compareAndSet(delegate, null))
+            delegate.close(force);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.info(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.update(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.setTimes(path, accessTime, modificationTime);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.rename(src, dest);
+            }
+        }, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.delete(path, recursive);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
+        final long len) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
+            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.affinity(path, start, len);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.contentSummary(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.mkdirs(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
+            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listFiles(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
+            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listPaths(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.fsStatus();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path, seqReadsBeforePrefetch);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
+        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        @Nullable final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.append(path, create, props);
+            }
+        }, path);
+    }
+
+    /**
+     * Execute closure which is not path-specific.
+     *
+     * @param clo Closure.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
+        return withReconnectHandling(clo, null);
+    }
+
+    /**
+     * Execute closure.
+     *
+     * @param clo Closure.
+     * @param path Path for exceptions.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
+        throws IOException {
+        Exception err = null;
+
+        for (int i = 0; i < 2; i++) {
+            Delegate curDelegate = null;
+
+            boolean close = false;
+            boolean force = false;
+
+            try {
+                curDelegate = delegate();
+
+                assert curDelegate != null;
+
+                close = curDelegate.doomed;
+
+                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+            }
+            catch (HadoopIgfsCommunicationException e) {
+                if (curDelegate != null && !curDelegate.doomed) {
+                    // Try getting rid fo faulty delegate ASAP.
+                    delegateRef.compareAndSet(curDelegate, null);
+
+                    close = true;
+                    force = true;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to a server: " + e);
+
+                err = e;
+            }
+            catch (IgniteCheckedException e) {
+                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
+            }
+            finally {
+                if (close) {
+                    assert curDelegate != null;
+
+                    curDelegate.close(force);
+                }
+            }
+        }
+
+        throw new IOException("Failed to communicate with IGFS.", err);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private Delegate delegate() throws HadoopIgfsCommunicationException {
+        Exception err = null;
+
+        // 1. If delegate is set, return it immediately.
+        Delegate curDelegate = delegateRef.get();
+
+        if (curDelegate != null)
+            return curDelegate;
+
+        // 2. Guess that we are in the same VM.
+        if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false)) {
+            IgfsEx igfs = null;
+
+            if (endpoint.grid() == null) {
+                try {
+                    Ignite ignite = G.ignite();
+
+                    igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+            else {
+                for (Ignite ignite : G.allGrids()) {
+                    try {
+                        igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
+
+                        break;
+                    }
+                    catch (Exception e) {
+                        err = e;
+                    }
+                }
+            }
+
+            if (igfs != null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsInProc(igfs, log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 3. Try connecting using shmem.
+        if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
+            if (curDelegate == null && !U.isWindows()) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local IGFS using shmem.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 4. Try local TCP connection.
+        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
+
+        if (!skipLocTcp) {
+            if (curDelegate == null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                        log);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local IGFS using TCP.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 5. Try remote TCP connection.
+        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to out-proc remote IGFS using TCP.", e);
+
+                err = e;
+            }
+        }
+
+        if (curDelegate != null) {
+            if (!delegateRef.compareAndSet(null, curDelegate))
+                curDelegate.doomed = true;
+
+            return curDelegate;
+        }
+        else
+            throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint, err);
+    }
+
+    /**
+     * File system operation closure.
+     */
+    private static interface FileSystemClosure<T> {
+        /**
+         * Call closure body.
+         *
+         * @param hadoop RPC handler.
+         * @param hndResp Handshake response.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         * @throws IOException If failed.
+         */
+        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
+    }
+
+    /**
+     * Delegate.
+     */
+    private static class Delegate {
+        /** RPC handler. */
+        private final HadoopIgfsEx hadoop;
+
+        /** Handshake request. */
+        private final IgfsHandshakeResponse hndResp;
+
+        /** Close guard. */
+        private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+        /** Whether this delegate must be closed at the end of the next invocation. */
+        private boolean doomed;
+
+        /**
+         * Constructor.
+         *
+         * @param hadoop Hadoop.
+         * @param hndResp Handshake response.
+         */
+        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
+            this.hadoop = hadoop;
+            this.hndResp = hndResp;
+        }
+
+        /**
+         * Close underlying RPC handler.
+         *
+         * @param force Force flag.
+         */
+        private void close(boolean force) {
+            if (closeGuard.compareAndSet(false, true))
+                hadoop.close(force);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
deleted file mode 100644
index b124312..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class GridHadoopJobMetadata implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    private GridHadoopJobId jobId;
-
-    /** Job info. */
-    private GridHadoopJobInfo jobInfo;
-
-    /** Node submitted job. */
-    private UUID submitNodeId;
-
-    /** Map-reduce plan. */
-    private GridHadoopMapReducePlan mrPlan;
-
-    /** Pending splits for which mapper should be executed. */
-    private Map<GridHadoopInputSplit, Integer> pendingSplits;
-
-    /** Pending reducers. */
-    private Collection<Integer> pendingReducers;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private Map<Integer, GridHadoopProcessDescriptor> reducersAddrs;
-
-    /** Job phase. */
-    private GridHadoopJobPhase phase = PHASE_SETUP;
-
-    /** Fail cause. */
-    @GridToStringExclude
-    private Throwable failCause;
-
-    /** Version. */
-    private long ver;
-
-    /** Job counters */
-    private GridHadoopCounters counters = new GridHadoopCountersImpl();
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridHadoopJobMetadata() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param submitNodeId Submit node ID.
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     */
-    public GridHadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.submitNodeId = submitNodeId;
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param src Metadata to copy.
-     */
-    public GridHadoopJobMetadata(GridHadoopJobMetadata src) {
-        // Make sure to preserve alphabetic order.
-        counters = src.counters;
-        failCause = src.failCause;
-        jobId = src.jobId;
-        jobInfo = src.jobInfo;
-        mrPlan = src.mrPlan;
-        pendingSplits = src.pendingSplits;
-        pendingReducers = src.pendingReducers;
-        phase = src.phase;
-        reducersAddrs = src.reducersAddrs;
-        submitNodeId = src.submitNodeId;
-        ver = src.ver + 1;
-    }
-
-    /**
-     * @return Submit node ID.
-     */
-    public UUID submitNodeId() {
-        return submitNodeId;
-    }
-
-    /**
-     * @param phase Job phase.
-     */
-    public void phase(GridHadoopJobPhase phase) {
-        this.phase = phase;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public GridHadoopJobPhase phase() {
-        return phase;
-    }
-
-    /**
-     * Gets reducers addresses for external execution.
-     *
-     * @return Reducers addresses.
-     */
-    public Map<Integer, GridHadoopProcessDescriptor> reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /**
-     * Sets reducers addresses for external execution.
-     *
-     * @param reducersAddrs Map of addresses.
-     */
-    public void reducersAddresses(Map<Integer, GridHadoopProcessDescriptor> reducersAddrs) {
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * Sets collection of pending splits.
-     *
-     * @param pendingSplits Collection of pending splits.
-     */
-    public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
-        this.pendingSplits = pendingSplits;
-    }
-
-    /**
-     * Gets collection of pending splits.
-     *
-     * @return Collection of pending splits.
-     */
-    public Map<GridHadoopInputSplit, Integer> pendingSplits() {
-        return pendingSplits;
-    }
-
-    /**
-     * Sets collection of pending reducers.
-     *
-     * @param pendingReducers Collection of pending reducers.
-     */
-    public void pendingReducers(Collection<Integer> pendingReducers) {
-        this.pendingReducers = pendingReducers;
-    }
-
-    /**
-     * Gets collection of pending reducers.
-     *
-     * @return Collection of pending reducers.
-     */
-    public Collection<Integer> pendingReducers() {
-        return pendingReducers;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public GridHadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @param mrPlan Map-reduce plan.
-     */
-    public void mapReducePlan(GridHadoopMapReducePlan mrPlan) {
-        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
-        this.mrPlan = mrPlan;
-    }
-
-    /**
-     * @return Map-reduce plan.
-     */
-    public GridHadoopMapReducePlan mapReducePlan() {
-        return mrPlan;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public GridHadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @return Collection of counters.
-     */
-    public GridHadoopCounters counters() {
-        return counters;
-    }
-
-    /**
-     * Sets counters.
-     *
-     * @param counters Collection of counters.
-     */
-    public void counters(GridHadoopCounters counters) {
-        this.counters = counters;
-    }
-
-    /**
-     * @param failCause Fail cause.
-     */
-    public void failCause(Throwable failCause) {
-        assert failCause != null;
-
-        if (this.failCause == null) // Keep the first error.
-            this.failCause = failCause;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Version.
-     */
-    public long version() {
-        return ver;
-    }
-
-    /**
-     * @param split Split.
-     * @return Task number.
-     */
-    public int taskNumber(GridHadoopInputSplit split) {
-        return pendingSplits.get(split);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, submitNodeId);
-        out.writeObject(jobId);
-        out.writeObject(jobInfo);
-        out.writeObject(mrPlan);
-        out.writeObject(pendingSplits);
-        out.writeObject(pendingReducers);
-        out.writeObject(phase);
-        out.writeObject(failCause);
-        out.writeLong(ver);
-        out.writeObject(reducersAddrs);
-        out.writeObject(counters);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        submitNodeId = U.readUuid(in);
-        jobId = (GridHadoopJobId)in.readObject();
-        jobInfo = (GridHadoopJobInfo)in.readObject();
-        mrPlan = (GridHadoopMapReducePlan)in.readObject();
-        pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
-        pendingReducers = (Collection<Integer>)in.readObject();
-        phase = (GridHadoopJobPhase)in.readObject();
-        failCause = (Throwable)in.readObject();
-        ver = in.readLong();
-        reducersAddrs = (Map<Integer, GridHadoopProcessDescriptor>)in.readObject();
-        counters = (GridHadoopCounters)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(GridHadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
-            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
-                failCause.getClass().getName());
-    }
-}


Mime
View raw message