ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [28/57] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP (6)
Date Fri, 13 Feb 2015 10:54:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java
new file mode 100644
index 0000000..e2905cc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsControlResponse.java
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*;
+
+/**
+ * GGFS path command response.
+ */
+public class IgfsControlResponse extends IgfsMessage {
+    /** Generic error (not GGFS) while performing operations. */
+    private static final int ERR_GENERIC = 0;
+
+    /** Generic GGFS error while performing operations. */
+    private static final int ERR_GGFS_GENERIC = 1;
+
+    /** Target file not found. */
+    private static final int ERR_FILE_NOT_FOUND = 2;
+
+    /** Target path already exists. */
+    private static final int ERR_PATH_ALREADY_EXISTS = 3;
+
+    /** Directory is not empty with */
+    private static final int ERR_DIRECTORY_NOT_EMPTY = 4;
+
+    /** Target parent is not a directory. */
+    private static final int ERR_PARENT_NOT_DIRECTORY = 5;
+
+    /** Secondary HDFS version differs from classpath version. */
+    private static final int ERR_INVALID_HDFS_VERSION = 6;
+
+    /** Failed to retrieve file's data block. */
+    private static final int ERR_CORRUPTED_FILE = 7;
+
+    /** Response is boolean. */
+    public static final int RES_TYPE_BOOLEAN = 0;
+
+    /** Response is Long. */
+    public static final int RES_TYPE_LONG = 1;
+
+    /** Response is GridGgfsFile. */
+    public static final int RES_TYPE_GGFS_FILE = 2;
+
+    /** Response is GridGgfsFileInfo. */
+    public static final int RES_TYPE_GGFS_STREAM_DESCRIPTOR = 3;
+
+    /** Response is GridGgfsPath. */
+    public static final int RES_TYPE_GGFS_PATH = 4;
+
+    /** Response is collection of GridGgfsFile. */
+    public static final int RES_TYPE_COL_GGFS_FILE = 5;
+
+    /** Response is collection of GridGgfsPath. */
+    public static final int RES_TYPE_COL_GGFS_PATH = 6;
+
+    /** Response is collection of GridGgfsBlockLocation. */
+    public static final int RES_TYPE_COL_GGFS_BLOCK_LOCATION = 7;
+
+    /** Response is collection of GridGgfsBlockLocation. */
+    public static final int RES_TYPE_BYTE_ARRAY = 8;
+
+    /** Response is an error containing stream ID and error message. */
+    public static final int RES_TYPE_ERR_STREAM_ID = 9;
+
+    /** Response is a handshake  */
+    public static final int RES_TYPE_HANDSHAKE = 10;
+
+    /** Response is a handshake  */
+    public static final int RES_TYPE_STATUS = 11;
+
+    /** Response is a path summary. */
+    public static final int RES_TYPE_GGFS_PATH_SUMMARY = 12;
+
+    /** Message header size. */
+    public static final int RES_HEADER_SIZE = 9;
+
+    /** We have limited number of object response types. */
+    private int resType = -1;
+
+    /** Response. */
+    @GridToStringInclude
+    private Object res;
+
+    /** Bytes length to avoid iteration and summing. */
+    private int len;
+
+    /** Error (if any). */
+    private String err;
+
+    /** Error code. */
+    private int errCode = -1;
+
+    /**
+     *
+     */
+    public IgfsControlResponse() {
+        command(CONTROL_RESPONSE);
+    }
+
+    /**
+     * @return Response.
+     */
+    public Object response() {
+        return res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(boolean res) {
+        resType = RES_TYPE_BOOLEAN;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(long res) {
+        resType = RES_TYPE_LONG;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(byte[][] res) {
+        resType = RES_TYPE_BYTE_ARRAY;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(IgfsInputStreamDescriptor res) {
+        resType = RES_TYPE_GGFS_STREAM_DESCRIPTOR;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(IgniteFsFile res) {
+        resType = RES_TYPE_GGFS_FILE;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void response(IgniteFsPath res) {
+        resType = RES_TYPE_GGFS_PATH;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Path summary response.
+     */
+    public void response(IgniteFsPathSummary res) {
+        resType = RES_TYPE_GGFS_PATH_SUMMARY;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void files(Collection<IgniteFsFile> res) {
+        resType = RES_TYPE_COL_GGFS_FILE;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void paths(Collection<IgniteFsPath> res) {
+        resType = RES_TYPE_COL_GGFS_PATH;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Response.
+     */
+    public void locations(Collection<IgniteFsBlockLocation> res) {
+        resType = RES_TYPE_COL_GGFS_BLOCK_LOCATION;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Handshake message.
+     */
+    public void handshake(IgfsHandshakeResponse res) {
+        resType = RES_TYPE_HANDSHAKE;
+
+        this.res = res;
+    }
+
+    /**
+     * @param res Status response.
+     */
+    public void status(IgfsStatus res) {
+        resType = RES_TYPE_STATUS;
+
+        this.res = res;
+    }
+
+    /**
+     * @param len Response length.
+     */
+    public void length(int len) {
+        this.len = len;
+    }
+
+    /**
+     * @return Error message if occurred.
+     */
+    public boolean hasError() {
+        return errCode != -1;
+    }
+
+    /**
+     * @param errCode Error code.
+     * @param err Error.
+     * @throws IgniteCheckedException Based on error code.
+     */
+    public static void throwError(Integer errCode, String err) throws IgniteCheckedException {
+        assert err != null;
+        assert errCode != -1;
+
+        if (errCode == ERR_FILE_NOT_FOUND)
+            throw new IgniteFsFileNotFoundException(err);
+        else if (errCode == ERR_PATH_ALREADY_EXISTS)
+            throw new IgniteFsPathAlreadyExistsException(err);
+        else if (errCode == ERR_DIRECTORY_NOT_EMPTY)
+            throw new IgfsDirectoryNotEmptyException(err);
+        else if (errCode == ERR_PARENT_NOT_DIRECTORY)
+            throw new IgniteFsParentNotDirectoryException(err);
+        else if (errCode == ERR_INVALID_HDFS_VERSION)
+            throw new IgniteFsInvalidHdfsVersionException(err);
+        else if (errCode == ERR_CORRUPTED_FILE)
+            throw new IgniteFsCorruptedFileException(err);
+        else if (errCode == ERR_GGFS_GENERIC)
+            throw new IgniteFsException(err);
+
+        throw new IgniteCheckedException(err);
+    }
+
+    /**
+     * @throws IgniteCheckedException Based on error code.
+     */
+    public void throwError() throws IgniteCheckedException {
+        throwError(errCode, err);
+    }
+
+    /**
+     * @return Error code.
+     */
+    public int errorCode() {
+        return errCode;
+    }
+
+    /**
+     * @param e Error if occurred.
+     */
+    public void error(IgniteCheckedException e) {
+        err = e.getMessage();
+        errCode = errorCode(e);
+    }
+
+    /**
+     * @param streamId Stream ID.
+     * @param err Error message if occurred.
+     */
+    public void error(long streamId, String err) {
+        resType = RES_TYPE_ERR_STREAM_ID;
+
+        res = streamId;
+        errCode = ERR_GENERIC;
+
+        this.err = err;
+    }
+
+    /**
+     * Gets error code based on exception class.
+     *
+     * @param e Exception to analyze.
+     * @return Error code.
+     */
+    private int errorCode(IgniteCheckedException e) {
+        return errorCode(e, true);
+    }
+
+    /**
+     * Gets error code based on exception class.
+     *
+     * @param e Exception to analyze.
+     * @param checkIo Whether to check for IO exception.
+     * @return Error code.
+     */
+    @SuppressWarnings("unchecked")
+    private int errorCode(IgniteCheckedException e, boolean checkIo) {
+        if (X.hasCause(e, IgniteFsFileNotFoundException.class))
+            return ERR_FILE_NOT_FOUND;
+        else if (e.hasCause(IgniteFsPathAlreadyExistsException.class))
+            return ERR_PATH_ALREADY_EXISTS;
+        else if (e.hasCause(IgfsDirectoryNotEmptyException.class))
+            return ERR_DIRECTORY_NOT_EMPTY;
+        else if (e.hasCause(IgniteFsParentNotDirectoryException.class))
+            return ERR_PARENT_NOT_DIRECTORY;
+        else if (e.hasCause(IgniteFsInvalidHdfsVersionException.class))
+            return ERR_INVALID_HDFS_VERSION;
+        else if (e.hasCause(IgniteFsCorruptedFileException.class))
+            return ERR_CORRUPTED_FILE;
+            // This check should be the last.
+        else if (e.hasCause(IgniteFsException.class))
+            return ERR_GGFS_GENERIC;
+
+        return ERR_GENERIC;
+    }
+
+    /**
+     * Writes object to data output. Do not use externalizable interface to avoid marshaller.
+     *
+     * @param out Data output.
+     * @throws IOException If error occurred.
+     */
+    @SuppressWarnings("unchecked")
+    public void writeExternal(ObjectOutput out) throws IOException {
+        byte[] hdr = new byte[RES_HEADER_SIZE];
+
+        U.intToBytes(resType, hdr, 0);
+
+        int off = 4;
+
+        hdr[off++] = err != null ? (byte)1 : (byte)0;
+
+        if (resType == RES_TYPE_BYTE_ARRAY)
+            U.intToBytes(len, hdr, off);
+
+        out.write(hdr);
+
+        if (err != null) {
+            out.writeUTF(err);
+            out.writeInt(errCode);
+
+            if (resType == RES_TYPE_ERR_STREAM_ID)
+                out.writeLong((Long)res);
+
+            return;
+        }
+
+        switch (resType) {
+            case RES_TYPE_BOOLEAN:
+                out.writeBoolean((Boolean)res);
+
+                break;
+
+            case RES_TYPE_LONG:
+                out.writeLong((Long)res);
+
+                break;
+
+            case RES_TYPE_BYTE_ARRAY:
+                byte[][] buf = (byte[][])res;
+
+                for (byte[] bytes : buf)
+                    out.write(bytes);
+
+                break;
+
+            case RES_TYPE_GGFS_PATH:
+            case RES_TYPE_GGFS_PATH_SUMMARY:
+            case RES_TYPE_GGFS_FILE:
+            case RES_TYPE_GGFS_STREAM_DESCRIPTOR:
+            case RES_TYPE_HANDSHAKE:
+            case RES_TYPE_STATUS: {
+                out.writeBoolean(res != null);
+
+                if (res != null)
+                    ((Externalizable)res).writeExternal(out);
+
+                break;
+            }
+
+            case RES_TYPE_COL_GGFS_FILE:
+            case RES_TYPE_COL_GGFS_PATH:
+            case RES_TYPE_COL_GGFS_BLOCK_LOCATION: {
+                Collection<Externalizable> items = (Collection<Externalizable>)res;
+
+                if (items != null) {
+                    out.writeInt(items.size());
+
+                    for (Externalizable item : items)
+                        item.writeExternal(out);
+                }
+                else
+                    out.writeInt(-1);
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * Reads object from data input.
+     *
+     * @param in Data input.
+     * @throws IOException If read failed.
+     * @throws ClassNotFoundException If could not find class.
+     */
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        byte[] hdr = new byte[RES_HEADER_SIZE];
+
+        in.readFully(hdr);
+
+        resType = U.bytesToInt(hdr, 0);
+
+        boolean hasErr = hdr[4] != 0;
+
+        if (hasErr) {
+            err = in.readUTF();
+            errCode = in.readInt();
+
+            if (resType == RES_TYPE_ERR_STREAM_ID)
+                res = in.readLong();
+
+            return;
+        }
+
+        switch (resType) {
+            case RES_TYPE_BOOLEAN:
+                res = in.readBoolean();
+
+                break;
+
+            case RES_TYPE_LONG:
+                res = in.readLong();
+
+                break;
+
+            case RES_TYPE_GGFS_PATH: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgniteFsPath path = new IgniteFsPath();
+
+                    path.readExternal(in);
+
+                    res = path;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_GGFS_PATH_SUMMARY: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgniteFsPathSummary sum = new IgniteFsPathSummary();
+
+                    sum.readExternal(in);
+
+                    res = sum;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_GGFS_FILE: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgfsFileImpl file = new IgfsFileImpl();
+
+                    file.readExternal(in);
+
+                    res = file;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_GGFS_STREAM_DESCRIPTOR: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgfsInputStreamDescriptor desc = new IgfsInputStreamDescriptor();
+
+                    desc.readExternal(in);
+
+                    res = desc;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_HANDSHAKE: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgfsHandshakeResponse msg = new IgfsHandshakeResponse();
+
+                    msg.readExternal(in);
+
+                    res = msg;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_STATUS: {
+                boolean hasVal = in.readBoolean();
+
+                if (hasVal) {
+                    IgfsStatus msg = new IgfsStatus();
+
+                    msg.readExternal(in);
+
+                    res = msg;
+                }
+
+                break;
+            }
+
+            case RES_TYPE_COL_GGFS_FILE: {
+                Collection<IgniteFsFile> files = null;
+
+                int size = in.readInt();
+
+                if (size >= 0) {
+                    files = new ArrayList<>(size);
+
+                    for (int i = 0; i < size; i++) {
+                        IgfsFileImpl file = new IgfsFileImpl();
+
+                        file.readExternal(in);
+
+                        files.add(file);
+                    }
+                }
+
+                res = files;
+
+                break;
+            }
+
+            case RES_TYPE_COL_GGFS_PATH: {
+                Collection<IgniteFsPath> paths = null;
+
+                int size = in.readInt();
+
+                if (size >= 0) {
+                    paths = new ArrayList<>(size);
+
+                    for (int i = 0; i < size; i++) {
+                        IgniteFsPath path = new IgniteFsPath();
+
+                        path.readExternal(in);
+
+                        paths.add(path);
+                    }
+                }
+
+                res = paths;
+
+                break;
+            }
+
+            case RES_TYPE_COL_GGFS_BLOCK_LOCATION: {
+                Collection<IgniteFsBlockLocation> locations = null;
+
+                int size = in.readInt();
+
+                if (size >= 0) {
+                    locations = new ArrayList<>(size);
+
+                    for (int i = 0; i < size; i++) {
+                        IgfsBlockLocationImpl location = new IgfsBlockLocationImpl();
+
+                        location.readExternal(in);
+
+                        locations.add(location);
+                    }
+                }
+
+                res = locations;
+
+                break;
+            }
+
+            case RES_TYPE_BYTE_ARRAY:
+                assert false : "Response type of byte array should never be processed by marshaller.";
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsControlResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java
new file mode 100644
index 0000000..4969305
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataInputStream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import java.io.*;
+
+/**
+ * Data input stream implementing object input but throwing exceptions on object methods.
+ */
+public class IgfsDataInputStream extends DataInputStream implements ObjectInput {
+    /**
+     * Creates a DataInputStream that uses the specified
+     * underlying InputStream.
+     *
+     * @param  in The specified input stream
+     */
+    public IgfsDataInputStream(InputStream in) {
+        super(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object readObject() throws ClassNotFoundException, IOException {
+        throw new IOException("This method must not be invoked on GGFS data input stream.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java
new file mode 100644
index 0000000..b562d36
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsDataOutputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import java.io.*;
+
+/**
+ * Data output stream implementing ObjectOutput but throwing exceptions on methods working with objects.
+ */
+public class IgfsDataOutputStream extends DataOutputStream implements ObjectOutput {
+    /**
+     * Creates a new data output stream to write data to the specified
+     * underlying output stream. The counter <code>written</code> is
+     * set to zero.
+     *
+     * @param   out   the underlying output stream, to be saved for later
+     *                use.
+     * @see     FilterOutputStream#out
+     */
+    public IgfsDataOutputStream(OutputStream out) {
+        super(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeObject(Object obj) throws IOException {
+        throw new IOException("This method must not be invoked on GGFS data output stream.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java
new file mode 100644
index 0000000..c758979
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsHandshakeRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*;
+
+/**
+ * Handshake request.
+ */
+public class IgfsHandshakeRequest extends IgfsMessage {
+    /** Expected Grid name. */
+    private String gridName;
+
+    /** Expected GGFS name. */
+    private String ggfsName;
+
+    /** Logger directory. */
+    private String logDir;
+
+    /** {@inheritDoc} */
+    @Override public IgfsIpcCommand command() {
+        return HANDSHAKE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void command(IgfsIpcCommand cmd) {
+        // No-op.
+    }
+
+    /**
+     * @return Grid name.
+     */
+    public String gridName() {
+        return gridName;
+    }
+
+    /**
+     * @param gridName Grid name.
+     */
+    public void gridName(String gridName) {
+        this.gridName = gridName;
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String ggfsName() {
+        return ggfsName;
+    }
+
+    /**
+     * @param ggfsName GGFS name.
+     */
+    public void ggfsName(String ggfsName) {
+        this.ggfsName = ggfsName;
+    }
+
+    /**
+     * @return Log directory.
+     */
+    public String logDirectory() {
+        return logDir;
+    }
+
+    /**
+     * @param logDir Log directory.
+     */
+    public void logDirectory(String logDir) {
+        this.logDir = logDir;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsHandshakeRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java
new file mode 100644
index 0000000..7530a57
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsIpcCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import java.util.*;
+
+/**
+ * Grid file system commands to call remotely.
+ */
+public enum IgfsIpcCommand {
+    /** Handshake command which will send information necessary for client to handle requests correctly. */
+    HANDSHAKE,
+
+    /** GGFS status (free/used space). */
+    STATUS,
+
+    /** Check specified path exists in the file system. */
+    EXISTS,
+
+    /** Get information for the file in specified path. */
+    INFO,
+
+    /** Get directory summary. */
+    PATH_SUMMARY,
+
+    /** Update information for the file  in specified path. */
+    UPDATE,
+
+    /** Rename file. */
+    RENAME,
+
+    /** Delete file. */
+    DELETE,
+
+    /** Make directories. */
+    MAKE_DIRECTORIES,
+
+    /** List files under the specified path. */
+    LIST_PATHS,
+
+    /** List files under the specified path. */
+    LIST_FILES,
+
+    /** Get affinity block locations for data blocks of the file. */
+    AFFINITY,
+
+    /** Updates last access and last modification time for a path. */
+    SET_TIMES,
+
+    /** Open file for reading as an input stream. */
+    OPEN_READ,
+
+    /** Open existent file as output stream to append data to. */
+    OPEN_APPEND,
+
+    /** Create file and open output stream for writing data to. */
+    OPEN_CREATE,
+
+    /** Close stream. */
+    CLOSE,
+
+    /** Read file's data block. */
+    READ_BLOCK,
+
+    /** Write file's data block. */
+    WRITE_BLOCK,
+
+    /** Server response. */
+    CONTROL_RESPONSE;
+
+    /** All values */
+    private static final List<IgfsIpcCommand> ALL = Arrays.asList(values());
+
+    /**
+     * Resolve command by its ordinal.
+     *
+     * @param ordinal Command ordinal.
+     * @return Resolved command.
+     */
+    public static IgfsIpcCommand valueOf(int ordinal) {
+        return ALL.get(ordinal);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java
new file mode 100644
index 0000000..a2f32f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsLogger.java
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * GGFS client logger writing data to the file.
+ */
+public final class IgfsLogger {
+    /** Field delimiter. */
+    public static final String DELIM_FIELD = ";";
+
+    /** Field values delimiter. */
+    public static final String DELIM_FIELD_VAL = ",";
+
+    /** Pre-defined header string. */
+    public static final String HDR = "Timestamp" + DELIM_FIELD + "ThreadID" + DELIM_FIELD + "PID" + DELIM_FIELD +
+        "Type" + DELIM_FIELD + "Path" + DELIM_FIELD + "Mode" + DELIM_FIELD + "StreamId" + DELIM_FIELD + "BufSize" +
+        DELIM_FIELD + "DataLen" + DELIM_FIELD + "Append" + DELIM_FIELD + "Overwrite" + DELIM_FIELD + "Replication" +
+        DELIM_FIELD + "BlockSize" + DELIM_FIELD + "Position" + DELIM_FIELD + "ReadLen" + DELIM_FIELD + "SkipCnt" +
+        DELIM_FIELD + "ReadLimit" + DELIM_FIELD + "UserTime" + DELIM_FIELD + "SystemTime" + DELIM_FIELD +
+        "TotalBytes" + DELIM_FIELD + "DestPath" + DELIM_FIELD + "Recursive" + DELIM_FIELD + "List";
+
+    /** File open. */
+    public static final int TYPE_OPEN_IN = 0;
+
+    /** File create or append. */
+    public static final int TYPE_OPEN_OUT = 1;
+
+    /** Random read. */
+    public static final int TYPE_RANDOM_READ = 2;
+
+    /** Seek. */
+    public static final int TYPE_SEEK = 3;
+
+    /** Skip. */
+    public static final int TYPE_SKIP = 4;
+
+    /** Mark. */
+    public static final int TYPE_MARK = 5;
+
+    /** Reset. */
+    public static final int TYPE_RESET = 6;
+
+    /** Close input stream. */
+    public static final int TYPE_CLOSE_IN = 7;
+
+    /** Close output stream. */
+    public static final int TYPE_CLOSE_OUT = 8;
+
+    /** Directory creation. */
+    public static final int TYPE_DIR_MAKE = 9;
+
+    /** Directory listing. */
+    public static final int TYPE_DIR_LIST = 10;
+
+    /** Rename. */
+    public static final int TYPE_RENAME = 11;
+
+    /** Delete. */
+    public static final int TYPE_DELETE = 12;
+
+    /** Counter for stream identifiers. */
+    private static final AtomicLong CNTR = new AtomicLong();
+
+    /** Loggers. */
+    private static final ConcurrentHashMap8<String, IgfsLogger> loggers =
+        new ConcurrentHashMap8<>();
+
+    /** Lock for atomic logger adds/removals. */
+    private static final ReadWriteLock logLock = new ReentrantReadWriteLock();
+
+    /** Predefined disabled logger. */
+    private static final IgfsLogger disabledLogger = new IgfsLogger();
+
+    /** Logger enabled flag. */
+    private boolean enabled;
+
+    /** Endpoint. */
+    private String endpoint;
+
+    /** Batch size. */
+    private int batchSize;
+
+    /** File to which data is to be written. */
+    private File file;
+
+    /** Read/write lock for concurrent entries collection modification. */
+    private ReadWriteLock rwLock;
+
+    /** Flush lock. */
+    private Lock flushLock;
+
+    /** Flush condition. */
+    private Condition flushCond;
+
+    /** Logged data flusher. */
+    private Thread flushWorker;
+
+    /** Process ID. */
+    private int pid;
+
+    /** Entries. */
+    private Collection<Entry> entries;
+
+    /** Entries counter in order to avoid concurrent collection size checks. */
+    private AtomicInteger cnt;
+
+    /** Logger usage counter. */
+    private AtomicInteger useCnt;
+
+    /**
+     * Get next stream ID.
+     *
+     * @return Stream ID.
+     */
+    public static long nextId() {
+        return CNTR.incrementAndGet();
+    }
+
+    /**
+     * Get disabled logger.
+     *
+     * @return Disable logger instance.
+     */
+    public static IgfsLogger disabledLogger() {
+        return disabledLogger;
+    }
+
+    /**
+     * Get logger instance for the given endpoint.
+     *
+     * @param endpoint Endpoint.
+     * @param dir Path.
+     * @param batchSize Batch size.
+     *
+     * @return Logger instance.
+     */
+    public static IgfsLogger logger(String endpoint, String ggfsName, String dir, int batchSize) {
+        if (endpoint == null)
+            endpoint = "";
+
+        logLock.readLock().lock();
+
+        try {
+            IgfsLogger log = loggers.get(endpoint);
+
+            if (log == null) {
+                log = new IgfsLogger(endpoint, ggfsName, dir, batchSize);
+
+                IgfsLogger log0 = loggers.putIfAbsent(endpoint, log);
+
+                if (log0 != null)
+                    log = log0;
+            }
+
+            log.useCnt.incrementAndGet();
+
+            return log;
+        }
+        finally {
+            logLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Construct disabled file logger.
+     */
+    private IgfsLogger() {
+        // No-op.
+    }
+
+    /**
+     * Construct normal file logger.
+     *
+     * @param endpoint Endpoint.
+     * @param ggfsName GGFS name.
+     * @param dir Log file path.
+     * @param batchSize Batch size.
+     */
+    private IgfsLogger(String endpoint, String ggfsName, String dir, int batchSize) {
+        A.notNull(endpoint, "endpoint cannot be null");
+        A.notNull(dir, "dir cannot be null");
+        A.ensure(batchSize > 0, "batch size cannot be negative");
+
+        enabled = true;
+
+        this.endpoint = endpoint;
+        this.batchSize = batchSize;
+
+        pid = U.jvmPid();
+
+        File dirFile = new File(dir);
+
+        A.ensure(dirFile.isDirectory(), "dir must point to a directory");
+        A.ensure(dirFile.exists(), "dir must exist");
+
+        file = new File(dirFile, "ggfs-log-" + ggfsName + "-" + pid + ".csv");
+
+        entries = new ConcurrentLinkedDeque8<>();
+
+        cnt = new AtomicInteger();
+        useCnt = new AtomicInteger();
+
+        rwLock = new ReentrantReadWriteLock();
+        flushLock = new ReentrantLock();
+        flushCond = flushLock.newCondition();
+
+        flushWorker = new Thread(new FlushWorker());
+
+        flushWorker.setDaemon(true);
+
+        flushWorker.start();
+    }
+
+    /**
+     * Check whether logging is enabled.
+     *
+     * @return {@code True} in case logging is enabled.
+     */
+    public boolean isLogEnabled() {
+        return enabled;
+    }
+
+    /**
+     * Log file open event.
+     *
+     * @param streamId Stream ID.
+     * @param path Path.
+     * @param mode Mode.
+     * @param bufSize Buffer size.
+     * @param dataLen Data length.
+     */
+    public void logOpen(long streamId, IgniteFsPath path, IgniteFsMode mode, int bufSize, long dataLen) {
+        addEntry(new Entry(TYPE_OPEN_IN, path.toString(), mode, streamId, bufSize, dataLen, null, null, null, null,
+            null, null, null, null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log file create event.
+     *
+     * @param streamId Stream ID.
+     * @param path Path.
+     * @param mode Mode.
+     * @param overwrite Overwrite flag.
+     * @param bufSize Buffer size.
+     * @param replication Replication factor.
+     * @param blockSize Block size.
+     */
+    public void logCreate(long streamId, IgniteFsPath path, IgniteFsMode mode, boolean overwrite, int bufSize,
+        int replication, long blockSize) {
+        addEntry(new Entry(TYPE_OPEN_OUT, path.toString(), mode, streamId, bufSize, null, false, overwrite, replication,
+            blockSize, null, null, null, null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log file append event.
+     *
+     * @param streamId Stream ID.
+     * @param path Path.
+     * @param mode Mode.
+     * @param bufSize Buffer size.
+     */
+    public void logAppend(long streamId, IgniteFsPath path, IgniteFsMode mode, int bufSize) {
+        addEntry(new Entry(TYPE_OPEN_OUT, path.toString(), mode, streamId, bufSize, null, true, null, null, null, null,
+            null, null, null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log random read event.
+     *
+     * @param streamId Stream ID.
+     * @param pos Position.
+     * @param readLen Read bytes count.
+     */
+    public void logRandomRead(long streamId, long pos, int readLen) {
+        addEntry(new Entry(TYPE_RANDOM_READ, null, null, streamId, null, null, null, null, null, null, pos, readLen,
+            null, null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log seek event.
+     *
+     * @param streamId Stream ID.
+     * @param pos Position.
+     */
+    public void logSeek(long streamId, long pos) {
+        addEntry(new Entry(TYPE_SEEK, null, null, streamId, null, null, null, null, null, null, pos, null, null, null,
+            null, null, null, null, null, null));
+    }
+
+    /**
+     * Log skip event.
+     *
+     * @param streamId Stream ID.
+     * @param skipCnt Skip bytes count.
+     */
+    public void logSkip(long streamId, long skipCnt) {
+        addEntry(new Entry(TYPE_SKIP, null, null, streamId, null, null, null, null, null, null, null, null, skipCnt,
+            null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log mark event.
+     *
+     * @param streamId Stream ID.
+     * @param readLimit Read limit.
+     */
+    public void logMark(long streamId, long readLimit) {
+        addEntry(new Entry(TYPE_MARK, null, null, streamId, null, null, null, null, null, null, null, null, null,
+            readLimit, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log reset event.
+     *
+     * @param streamId Stream ID.
+     */
+    public void logReset(long streamId) {
+        addEntry(new Entry(TYPE_RESET, null, null, streamId, null, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, null));
+    }
+
+    /**
+     * Log input stream close event.
+     *
+     * @param streamId Stream ID.
+     * @param userTime User time.
+     * @param readTime Read time.
+     * @param total Total bytes read.
+     */
+    public void logCloseIn(long streamId, long userTime, long readTime, long total) {
+        addEntry(new Entry(TYPE_CLOSE_IN, null, null, streamId, null, null, null, null, null, null, null, null, null,
+            null, userTime, readTime, total ,null, null, null));
+    }
+
+    /**
+     * Log output stream close event.
+     *
+     * @param streamId Stream ID.
+     * @param userTime User time.
+     * @param writeTime Read time.
+     * @param total Total bytes read.
+     */
+    public void logCloseOut(long streamId, long userTime, long writeTime, long total) {
+        addEntry(new Entry(TYPE_CLOSE_OUT, null, null, streamId, null, null, null, null, null, null, null, null, null,
+            null, userTime, writeTime, total, null, null, null));
+    }
+
+    /**
+     * Log directory creation event.
+     *
+     * @param path Path.
+     * @param mode Mode.
+     */
+    public void logMakeDirectory(IgniteFsPath path, IgniteFsMode mode) {
+        addEntry(new Entry(TYPE_DIR_MAKE, path.toString(), mode, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, null, null, null));
+    }
+
+    /**
+     * Log directory listing event.
+     *
+     * @param path Path.
+     * @param mode Mode.
+     * @param files Files.
+     */
+    public void logListDirectory(IgniteFsPath path, IgniteFsMode mode, String[] files) {
+        addEntry(new Entry(TYPE_DIR_LIST, path.toString(), mode, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, null, null, files));
+    }
+
+    /**
+     * Log rename event.
+     *
+     * @param path Path.
+     * @param mode Mode.
+     * @param destPath Destination path.
+     */
+    public void logRename(IgniteFsPath path, IgniteFsMode mode, IgniteFsPath destPath) {
+        addEntry(new Entry(TYPE_RENAME, path.toString(), mode, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, destPath.toString(), null, null));
+    }
+
+    /**
+     * Log delete event.
+     *
+     * @param path Path.
+     * @param mode Mode.
+     * @param recursive Recursive flag.
+     */
+    public void logDelete(IgniteFsPath path, IgniteFsMode mode, boolean recursive) {
+        addEntry(new Entry(TYPE_DELETE, path.toString(), mode, null, null, null, null, null, null, null, null, null,
+            null, null, null, null, null, null, recursive, null));
+    }
+
+    /** {@inheritDoc} */
+    public void close() {
+        boolean close = false;
+
+        if (useCnt.decrementAndGet() == 0) {
+            logLock.writeLock().lock();
+
+            try {
+                if (useCnt.get() == 0) {
+                    loggers.remove(endpoint);
+
+                    close = true;
+                }
+            }
+            finally {
+                logLock.writeLock().unlock();
+            }
+        }
+
+        if (close) {
+            U.interrupt(flushWorker);
+
+            try {
+                U.join(flushWorker);
+            }
+            catch (IgniteInterruptedCheckedException ignore) {
+                // No-op.
+            }
+
+            entries.clear();
+        }
+    }
+
+    /**
+     * Add new log entry.
+     *
+     * @param entry Entry.
+     */
+    @SuppressWarnings("SignalWithoutCorrespondingAwait")
+    private void addEntry(Entry entry) {
+        assert entry != null;
+
+        rwLock.readLock().lock();
+
+        try {
+            entries.add(entry);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+
+        if (cnt.incrementAndGet() >= batchSize) {
+            if (flushLock.tryLock()) {
+                try {
+                    flushCond.signalAll();
+                }
+                finally {
+                    flushLock.unlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Logged entry.
+     */
+    private class Entry {
+        /** Thread ID. */
+        private final long threadId;
+
+        /** Timestamp. */
+        private final long ts;
+
+        /** Event type. */
+        private final int type;
+
+        /** File/dir path. */
+        private final String path;
+
+        /** Path mode. */
+        private IgniteFsMode mode;
+
+        /** Stream ID. */
+        private final long streamId;
+
+        /** Buffer size. Available only for OPEN_IN/OPEN_OUT events */
+        private final int bufSize;
+
+        /** Length of data available to read. Available only for OPEN_IN event. */
+        private final long dataLen;
+
+        /** Append flag. Available only for OPEN_OUT event. */
+        private final Boolean append;
+
+        /** Overwrite flag. Available only for OPEN_OUT event. */
+        private final Boolean overwrite;
+
+        /** Replication. Available only for OPEN_OUT event. */
+        private final int replication;
+
+        /** Block size. Available only for OPEN_OUT event. */
+        private final long blockSize;
+
+        /** Position of data being randomly read or seek. Available only for RANDOM_READ or SEEK events. */
+        private final long pos;
+
+        /** Length of data being randomly read. Available only for RANDOM_READ event. */
+        private final int readLen;
+
+        /** Amount of skipped bytes. Available only for SKIP event. */
+        private final long skipCnt;
+
+        /** Read limit. Available only for MARK event. */
+        private final long readLimit;
+
+        /** User time. Available only for CLOSE_IN/CLOSE_OUT events. */
+        private final long userTime;
+
+        /** System time (either read or write). Available only for CLOSE_IN/CLOSE_OUT events. */
+        private final long sysTime;
+
+        /** Total amount of read or written bytes. Available only for CLOSE_IN/CLOSE_OUT events.*/
+        private final long total;
+
+        /** Destination path. Available only for RENAME event. */
+        private final String destPath;
+
+        /** Recursive flag. Available only for DELETE event. */
+        private final Boolean recursive;
+
+        /** Directory listing. Available only for LIST event. */
+        private final String[] list;
+
+        /**
+         * Constructor.
+         *
+         * @param type Event type.
+         * @param path Path.
+         * @param mode Path mode.
+         * @param streamId Stream ID.
+         * @param bufSize Buffer size.
+         * @param dataLen Data length.
+         * @param append Append flag.
+         * @param overwrite Overwrite flag.
+         * @param replication Replication.
+         * @param blockSize Block size.
+         * @param pos Position.
+         * @param readLen Read length.
+         * @param skipCnt Skip count.
+         * @param readLimit Read limit.
+         * @param userTime User time.
+         * @param sysTime System time.
+         * @param total Read or written bytes.
+         * @param destPath Destination path.
+         * @param recursive Recursive flag.
+         * @param list Listed directories.
+         */
+        Entry(int type, String path, IgniteFsMode mode, Long streamId, Integer bufSize, Long dataLen, Boolean append,
+            Boolean overwrite, Integer replication, Long blockSize, Long pos, Integer readLen, Long skipCnt,
+            Long readLimit, Long userTime, Long sysTime, Long total, String destPath, Boolean recursive,
+            String[] list) {
+            threadId = Thread.currentThread().getId();
+            ts = U.currentTimeMillis();
+
+            this.type = type;
+            this.path = path;
+            this.mode = mode;
+            this.streamId = streamId != null ? streamId : -1;
+            this.bufSize = bufSize != null ? bufSize : -1;
+            this.dataLen = dataLen != null ? dataLen : -1;
+            this.append = append;
+            this.overwrite = overwrite;
+            this.replication = replication != null ? replication : -1;
+            this.blockSize = blockSize != null ? blockSize : -1;
+            this.pos = pos != null ? pos : -1;
+            this.readLen = readLen != null ? readLen : -1;
+            this.skipCnt = skipCnt != null ? skipCnt : -1;
+            this.readLimit = readLimit != null ? readLimit : -1;
+            this.userTime = userTime != null ? userTime : -1;
+            this.sysTime = sysTime != null ? sysTime : -1;
+            this.total = total != null ? total : -1;
+            this.destPath = destPath;
+            this.recursive = recursive;
+            this.list = list;
+        }
+
+        /**
+         * Return suitable representation of long value.
+         *
+         * @param val Value.
+         * @return String representation.
+         */
+        private String string(int val) {
+            return val != -1 ? String.valueOf(val) : "";
+        }
+
+        /**
+         * Return suitable representation of long value.
+         *
+         * @param val Value.
+         * @return String representation.
+         */
+        private String string(long val) {
+            return val != -1 ? String.valueOf(val) : "";
+        }
+
+        /**
+         * Return suitable representation of the object.
+         *
+         * @param val Object.
+         * @return String representation.
+         */
+        private String string(Object val) {
+            if (val == null)
+                return "";
+            else if (val instanceof Boolean)
+                return ((Boolean) val) ? "1" : "0";
+            else if (val instanceof String)
+                return ((String)val).replace(';', '~');
+            else if (val instanceof String[]) {
+                String[] val0 = (String[])val;
+
+                SB buf = new SB();
+
+                boolean first = true;
+
+                for (String str : val0) {
+                    if (first)
+                        first = false;
+                    else
+                        buf.a(DELIM_FIELD_VAL);
+
+                    buf.a(str.replace(';', '~'));
+                }
+
+                return buf.toString();
+            }
+            else
+                return val.toString();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            SB res = new SB();
+
+            res.a(ts).a(DELIM_FIELD).a(threadId).a(DELIM_FIELD).a(pid).a(DELIM_FIELD).a(type).a(DELIM_FIELD)
+                .a(string(path)).a(DELIM_FIELD).a(string(mode)).a(DELIM_FIELD).a(string(streamId)).a(DELIM_FIELD)
+                .a(string(bufSize)).a(DELIM_FIELD).a(string(dataLen)).a(DELIM_FIELD).a(string(append)).a(DELIM_FIELD)
+                .a(string(overwrite)).a(DELIM_FIELD).a(string(replication)).a(DELIM_FIELD).a(string(blockSize))
+                .a(DELIM_FIELD).a(string(pos)).a(DELIM_FIELD).a(string(readLen)).a(DELIM_FIELD).a(string(skipCnt))
+                .a(DELIM_FIELD).a(string(readLimit)).a(DELIM_FIELD).a(string(userTime)).a(DELIM_FIELD)
+                .a(string(sysTime)).a(DELIM_FIELD).a(string(total)).a(DELIM_FIELD).a(string(destPath)).a(DELIM_FIELD)
+                .a(string(recursive)).a(DELIM_FIELD).a(string(list));
+
+            return res.toString();
+        }
+    }
+
+    /**
+     * Data flush worker.
+     */
+    private class FlushWorker implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            Thread t = Thread.currentThread();
+
+            // We clear interrupted flag here in order to let the final flush proceed normally with IO operations.
+            while (!Thread.interrupted()) {
+                flushLock.lock();
+
+                try {
+                    while (cnt.get() < batchSize && !t.isInterrupted()) {
+                        try {
+                            U.await(flushCond, 1000L, TimeUnit.MILLISECONDS);
+                        }
+                        catch (IgniteInterruptedCheckedException ignore) {
+                            t.interrupt();
+
+                            break;
+                        }
+                    }
+                }
+                finally {
+                    flushLock.unlock();
+                }
+
+                if (!t.isInterrupted())
+                    flush();
+            }
+
+            // Flush remaining entries.
+            flush();
+        }
+
+        /**
+         * Flush buffered entries to disk.
+         */
+        @SuppressWarnings("TooBroadScope")
+        private void flush() {
+            Collection<Entry> entries0;
+
+            rwLock.writeLock().lock();
+
+            try {
+                entries0 = entries;
+
+                entries = new ConcurrentLinkedDeque8<>();
+            }
+            finally {
+                rwLock.writeLock().unlock();
+            }
+
+            // We could lost some increments here, but this is not critical if the new batch will exceed maximum
+            // size by several items.
+            cnt.set(0);
+
+            if (!entries0.isEmpty()) {
+                boolean addHdr = !file.exists();
+
+                FileOutputStream fos = null;
+                OutputStreamWriter osw = null;
+                BufferedWriter bw = null;
+
+                try {
+                    fos = new FileOutputStream(file, true);
+                    osw = new OutputStreamWriter(fos);
+                    bw = new BufferedWriter(osw);
+
+                    if (addHdr)
+                        bw.write(HDR + U.nl());
+
+                    for (Entry entry : entries0)
+                        bw.write(entry + U.nl());
+                }
+                catch (IOException e) {
+                    U.error(null, "Failed to flush logged entries to a disk due to an IO exception.", e);
+                }
+                finally {
+                    U.closeQuiet(bw);
+                    U.closeQuiet(osw);
+                    U.closeQuiet(fos);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java
new file mode 100644
index 0000000..f0679bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMarshaller.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*;
+
+/**
+ * Implementation of GGFS client message marshaller.
+ */
+public class IgfsMarshaller {
+    /** Packet header size. */
+    public static final int HEADER_SIZE = 24;
+
+    /**
+     * Creates new header with given request ID and command.
+     *
+     * @param reqId Request ID.
+     * @param cmd Command.
+     * @return Created header.
+     */
+    public static byte[] createHeader(long reqId, IgfsIpcCommand cmd) {
+        assert cmd != null;
+
+        byte[] hdr = new byte[HEADER_SIZE];
+
+        U.longToBytes(reqId, hdr, 0);
+
+        U.intToBytes(cmd.ordinal(), hdr, 8);
+
+        return hdr;
+    }
+
+    /**
+     * Creates new header with given request ID and command.
+     *
+     * @param reqId Request ID.
+     * @param cmd Command.
+     * @return Created header.
+     */
+    public static byte[] fillHeader(byte[] hdr, long reqId, IgfsIpcCommand cmd) {
+        assert cmd != null;
+
+        Arrays.fill(hdr, (byte)0);
+
+        U.longToBytes(reqId, hdr, 0);
+
+        U.intToBytes(cmd.ordinal(), hdr, 8);
+
+        return hdr;
+    }
+
+    /**
+     * @param msg Message.
+     * @param hdr Message header.
+     * @param out Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void marshall(IgfsMessage msg, byte[] hdr, ObjectOutput out) throws IgniteCheckedException {
+        assert hdr != null;
+        assert hdr.length == HEADER_SIZE;
+
+        try {
+            switch (msg.command()) {
+                case HANDSHAKE: {
+                    out.write(hdr);
+
+                    IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg;
+
+                    U.writeString(out, req.gridName());
+                    U.writeString(out, req.ggfsName());
+                    U.writeString(out, req.logDirectory());
+
+                    break;
+                }
+                case STATUS: {
+                    out.write(hdr);
+
+                    break;
+                }
+
+                case EXISTS:
+                case INFO:
+                case PATH_SUMMARY:
+                case UPDATE:
+                case RENAME:
+                case DELETE:
+                case MAKE_DIRECTORIES:
+                case LIST_PATHS:
+                case LIST_FILES:
+                case AFFINITY:
+                case SET_TIMES:
+                case OPEN_READ:
+                case OPEN_APPEND:
+                case OPEN_CREATE: {
+                    out.write(hdr);
+
+                    IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+
+                    writePath(out, req.path());
+                    writePath(out, req.destinationPath());
+                    out.writeBoolean(req.flag());
+                    out.writeBoolean(req.colocate());
+                    U.writeStringMap(out, req.properties());
+
+                    // Minor optimization.
+                    if (msg.command() == AFFINITY) {
+                        out.writeLong(req.start());
+                        out.writeLong(req.length());
+                    }
+                    else if (msg.command() == OPEN_CREATE) {
+                        out.writeInt(req.replication());
+                        out.writeLong(req.blockSize());
+                    }
+                    else if (msg.command() == SET_TIMES) {
+                        out.writeLong(req.accessTime());
+                        out.writeLong(req.modificationTime());
+                    }
+                    else if (msg.command() == OPEN_READ && req.flag())
+                        out.writeInt(req.sequentialReadsBeforePrefetch());
+
+                    break;
+                }
+
+                case CLOSE:
+                case READ_BLOCK:
+                case WRITE_BLOCK: {
+                    assert msg.command() != WRITE_BLOCK : "WRITE_BLOCK should be marshalled manually.";
+
+                    IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
+
+                    U.longToBytes(req.streamId(), hdr, 12);
+
+                    if (msg.command() == READ_BLOCK)
+                        U.intToBytes(req.length(), hdr, 20);
+
+                    out.write(hdr);
+
+                    if (msg.command() == READ_BLOCK)
+                        out.writeLong(req.position());
+
+                    break;
+                }
+
+                case CONTROL_RESPONSE: {
+                    out.write(hdr);
+
+                    IgfsControlResponse res = (IgfsControlResponse)msg;
+
+                    res.writeExternal(out);
+
+                    break;
+                }
+
+                default: {
+                    assert false : "Invalid command: " + msg.command();
+
+                    throw new IllegalArgumentException("Failed to marshal message (invalid command): " +
+                        msg.command());
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to send message to GGFS data node (is data node up and running?)", e);
+        }
+    }
+
+    /**
+     * @param cmd Command.
+     * @param hdr Header.
+     * @param in Input.
+     * @return Message.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsMessage unmarshall(IgfsIpcCommand cmd, byte[] hdr, ObjectInput in) throws IgniteCheckedException {
+        assert hdr != null;
+        assert hdr.length == HEADER_SIZE;
+
+        try {
+            IgfsMessage msg;
+
+            switch (cmd) {
+                case HANDSHAKE: {
+                    IgfsHandshakeRequest req = new IgfsHandshakeRequest();
+
+                    req.gridName(U.readString(in));
+                    req.ggfsName(U.readString(in));
+                    req.logDirectory(U.readString(in));
+
+                    msg = req;
+
+                    break;
+                }
+
+                case STATUS: {
+                    msg = new IgfsStatusRequest();
+
+                    break;
+                }
+
+                case EXISTS:
+                case INFO:
+                case PATH_SUMMARY:
+                case UPDATE:
+                case RENAME:
+                case DELETE:
+                case MAKE_DIRECTORIES:
+                case LIST_PATHS:
+                case LIST_FILES:
+                case SET_TIMES:
+                case AFFINITY:
+                case OPEN_READ:
+                case OPEN_APPEND:
+                case OPEN_CREATE: {
+                    IgfsPathControlRequest req = new IgfsPathControlRequest();
+
+                    req.path(readPath(in));
+                    req.destinationPath(readPath(in));
+                    req.flag(in.readBoolean());
+                    req.colocate(in.readBoolean());
+                    req.properties(U.readStringMap(in));
+
+                    // Minor optimization.
+                    if (cmd == AFFINITY) {
+                        req.start(in.readLong());
+                        req.length(in.readLong());
+                    }
+                    else if (cmd == OPEN_CREATE) {
+                        req.replication(in.readInt());
+                        req.blockSize(in.readLong());
+                    }
+                    else if (cmd == SET_TIMES) {
+                        req.accessTime(in.readLong());
+                        req.modificationTime(in.readLong());
+                    }
+                    else if (cmd == OPEN_READ && req.flag())
+                        req.sequentialReadsBeforePrefetch(in.readInt());
+
+                    msg = req;
+
+                    break;
+                }
+
+                case CLOSE:
+                case READ_BLOCK:
+                case WRITE_BLOCK: {
+                    IgfsStreamControlRequest req = new IgfsStreamControlRequest();
+
+                    long streamId = U.bytesToLong(hdr, 12);
+
+                    req.streamId(streamId);
+                    req.length(U.bytesToInt(hdr, 20));
+
+                    if (cmd == READ_BLOCK)
+                        req.position(in.readLong());
+
+                    msg = req;
+
+                    break;
+                }
+
+                case CONTROL_RESPONSE: {
+                    IgfsControlResponse res = new IgfsControlResponse();
+
+                    res.readExternal(in);
+
+                    msg = res;
+
+                    break;
+                }
+
+                default: {
+                    assert false : "Invalid command: " + cmd;
+
+                    throw new IllegalArgumentException("Failed to unmarshal message (invalid command): " + cmd);
+                }
+            }
+
+            assert msg != null;
+
+            msg.command(cmd);
+
+            return msg;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException("Failed to unmarshal client message: " + cmd, e);
+        }
+    }
+
+    /**
+     * Writes GGFS path to given data output. Can write {@code null} values.
+     *
+     * @param out Data output.
+     * @param path Path to write.
+     * @throws IOException If write failed.
+     */
+    private void writePath(ObjectOutput out, @Nullable IgniteFsPath path) throws IOException {
+        out.writeBoolean(path != null);
+
+        if (path != null)
+            path.writeExternal(out);
+    }
+
+    /**
+     * Reads GGFS path from data input that was written by {@link #writePath(ObjectOutput, org.apache.ignite.ignitefs.IgniteFsPath)}
+     * method.
+     *
+     * @param in Data input.
+     * @return Written path or {@code null}.
+     */
+    @Nullable private IgniteFsPath readPath(ObjectInput in) throws IOException {
+        if(in.readBoolean()) {
+            IgniteFsPath path = new IgniteFsPath();
+
+            path.readExternal(in);
+
+            return path;
+        }
+
+        return null;
+    }
+
+    /**
+     * Writes string to output.
+     *
+     * @param out Data output.
+     * @param str String.
+     * @throws IOException If write failed.
+     */
+    private void writeString(DataOutput out, @Nullable String str) throws IOException {
+        out.writeBoolean(str != null);
+
+        if (str != null)
+            out.writeUTF(str);
+    }
+
+    /**
+     * Reads string from input.
+     *
+     * @param in Data input.
+     * @return Read string.
+     * @throws IOException If read failed.
+     */
+    @Nullable private String readString(DataInput in) throws IOException {
+        boolean hasStr = in.readBoolean();
+
+        if (hasStr)
+            return in.readUTF();
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java
new file mode 100644
index 0000000..8bd6666
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+/**
+ * Abstract class for all messages sent between GGFS client (Hadoop File System implementation) and
+ * GGFS server (Ignite data node).
+ */
+public abstract class IgfsMessage {
+    /** GGFS command. */
+    private IgfsIpcCommand cmd;
+
+    /**
+     * @return Command.
+     */
+    public IgfsIpcCommand command() {
+        return cmd;
+    }
+
+    /**
+     * @param cmd Command.
+     */
+    public void command(IgfsIpcCommand cmd) {
+        this.cmd = cmd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java
new file mode 100644
index 0000000..de6505b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsPathControlRequest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Single path command message. This is a plain bean.
+ */
+public class IgfsPathControlRequest extends IgfsMessage {
+    /** Main path. */
+    private IgniteFsPath path;
+
+    /** Second path, rename command. */
+    private IgniteFsPath destPath;
+
+    /** Boolean flag, meaning depends on command. */
+    private boolean flag;
+
+    /** Boolean flag which controls whether file will be colocated on single node. */
+    private boolean colocate;
+
+    /** Properties. */
+    private Map<String, String> props;
+
+    /** Sequential reads before prefetch. */
+    private int seqReadsBeforePrefetch;
+
+    /** Start pos for affinity command. */
+    private long start;
+
+    /** Length for affinity code. */
+    private long len;
+
+    /** Hadoop replication factor. */
+    private int replication;
+
+    /** Hadoop block size. */
+    private long blockSize;
+
+    /** Last access time. */
+    private long accessTime;
+
+    /** Last modification time. */
+    private long modificationTime;
+
+    /**
+     * @param path Path.
+     */
+    public void path(IgniteFsPath path) {
+        this.path = path;
+    }
+
+    /**
+     * @param destPath Destination path (rename only).
+     */
+    public void destinationPath(IgniteFsPath destPath) {
+        this.destPath = destPath;
+    }
+
+    /**
+     * @param flag Flag value. Meaning depends on command.
+     */
+    public void flag(boolean flag) {
+        this.flag = flag;
+    }
+
+    /**
+     * @param colocate Colocate control flag value.
+     */
+    public void colocate(boolean colocate) {
+        this.colocate = colocate;
+    }
+
+    /**
+     * @param replication Hadoop replication factor.
+     */
+    public void replication(int replication) {
+        this.replication = replication;
+    }
+
+    /**
+     * @param blockSize Hadoop block size.
+     */
+    public void blockSize(long blockSize) {
+        this.blockSize = blockSize;
+    }
+
+    /**
+     * @param props Properties map.
+     */
+    public void properties(@Nullable Map<String, String> props) {
+        this.props = props;
+    }
+
+    /**
+     * @param seqReadsBeforePrefetch Sequential reads before prefetch.
+     */
+    public void sequentialReadsBeforePrefetch(int seqReadsBeforePrefetch) {
+        this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
+    }
+
+    /**
+     * @param start Start position (affinity command only).
+     */
+    public void start(long start) {
+        this.start = start;
+    }
+
+    /**
+     * @param len Length (affinity command only).
+     */
+    public void length(long len) {
+        this.len = len;
+    }
+
+    /**
+     * @param accessTime Last access time.
+     */
+    public void accessTime(long accessTime) {
+        this.accessTime = accessTime;
+    }
+
+    /**
+     * @param modificationTime Last modification time.
+     */
+    public void modificationTime(long modificationTime) {
+        this.modificationTime = modificationTime;
+    }
+
+    /**
+     * @return Path.
+     */
+    public IgniteFsPath path() {
+        return path;
+    }
+
+    /**
+     * @return Destination path (rename only).
+     */
+    public IgniteFsPath destinationPath() {
+        return destPath;
+    }
+
+    /**
+     * @return Flag value (meaning depends on command).
+     */
+    public boolean flag() {
+        return flag;
+    }
+
+    /**
+     * @return Colocate control flag value.
+     */
+    public boolean colocate() {
+        return colocate;
+    }
+
+    /**
+     * @return Hadoop replication factor.
+     */
+    public int replication() {
+        return replication;
+    }
+
+    /**
+     * @return Hadoop block size.
+     */
+    public long blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Properties.
+     */
+    public Map<String, String> properties() {
+        return props;
+    }
+
+    /**
+     * @return Sequential reads before prefetch.
+     */
+    public int sequentialReadsBeforePrefetch() {
+        return seqReadsBeforePrefetch;
+    }
+
+    /**
+     * @return Start position (affinity command only).
+     */
+    public long start() {
+        return start;
+    }
+
+    /**
+     * @return Length (affinity command only).
+     */
+    public long length() {
+        return len;
+    }
+
+    /**
+     * @return Last access time.
+     */
+    public long accessTime() {
+        return accessTime;
+    }
+
+    /**
+     * @return Last modification time.
+     */
+    public long modificationTime() {
+        return modificationTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java
new file mode 100644
index 0000000..6f1c74a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStatusRequest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import static org.apache.ignite.internal.fs.common.IgfsIpcCommand.*;
+
+/**
+ * GGFS status (total/used/free space) request.
+ */
+public class IgfsStatusRequest extends IgfsMessage {
+    /** {@inheritDoc} */
+    @Override public IgfsIpcCommand command() {
+        return STATUS;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void command(IgfsIpcCommand cmd) {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java
new file mode 100644
index 0000000..1a61172
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/fs/common/IgfsStreamControlRequest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.fs.common;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Read block request.
+ */
+public class IgfsStreamControlRequest extends IgfsMessage {
+    /** Stream id. */
+    private long streamId;
+
+    /** Data. */
+    @GridToStringExclude
+    private byte[] data;
+
+    /** Read position. */
+    private long pos;
+
+    /** Length to read. */
+    private int len;
+
+    /**
+     * @return Stream ID.
+     */
+    public long streamId() {
+        return streamId;
+    }
+
+    /**
+     * @param streamId Stream ID.
+     */
+    public void streamId(long streamId) {
+        this.streamId = streamId;
+    }
+
+    /**
+     * @return Data.
+     */
+    public byte[] data() {
+        return data;
+    }
+
+    /**
+     * @param data Data.
+     */
+    public void data(byte[] data) {
+        this.data = data;
+    }
+
+    /**
+     * @return Position.
+     */
+    public long position() {
+        return pos;
+    }
+
+    /**
+     * @param pos Position.
+     */
+    public void position(long pos) {
+        this.pos = pos;
+    }
+
+    /**
+     * @return Length.
+     */
+    public int length() {
+        return len;
+    }
+
+    /**
+     * @param len Length.
+     */
+    public void length(int len) {
+        this.len = len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsStreamControlRequest.class, this, "cmd", command(),
+            "dataLen", data == null ? 0 : data.length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b20d898b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
index 76bdabe..ea5fb99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
@@ -99,16 +99,16 @@ class IgfsIpcHandler implements IgfsServerHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<GridGgfsMessage> handleAsync(final IgfsClientSession ses,
-        final GridGgfsMessage msg, DataInput in) {
+    @Override public IgniteInternalFuture<IgfsMessage> handleAsync(final IgfsClientSession ses,
+        final IgfsMessage msg, DataInput in) {
         try {
             // Even if will be closed right after this call, response write error will be ignored.
             if (stopping)
                 return null;
 
-            final GridGgfsIpcCommand cmd = msg.command();
+            final IgfsIpcCommand cmd = msg.command();
 
-            IgniteInternalFuture<GridGgfsMessage> fut;
+            IgniteInternalFuture<IgfsMessage> fut;
 
             switch (cmd) {
                 // Execute not-blocking command synchronously in worker thread.
@@ -116,7 +116,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
                 case MAKE_DIRECTORIES:
                 case LIST_FILES:
                 case LIST_PATHS: {
-                    GridGgfsMessage res = execute(ses, cmd, msg, in);
+                    IgfsMessage res = execute(ses, cmd, msg, in);
 
                     fut = res == null ? null : new GridFinishedFuture<>(ctx, res);
 
@@ -125,8 +125,8 @@ class IgfsIpcHandler implements IgfsServerHandler {
 
                 // Execute command asynchronously in user's pool.
                 default: {
-                    fut = ctx.closure().callLocalSafe(new GridPlainCallable<GridGgfsMessage>() {
-                        @Override public GridGgfsMessage call() throws Exception {
+                    fut = ctx.closure().callLocalSafe(new GridPlainCallable<IgfsMessage>() {
+                        @Override public IgfsMessage call() throws Exception {
                             // No need to pass data input for non-write-block commands.
                             return execute(ses, cmd, msg, null);
                         }
@@ -152,12 +152,12 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Command execution result.
      * @throws Exception If failed.
      */
-    private GridGgfsMessage execute(IgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg,
+    private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg,
         @Nullable DataInput in)
         throws Exception {
         switch (cmd) {
             case HANDSHAKE:
-                return processHandshakeRequest((GridGgfsHandshakeRequest)msg);
+                return processHandshakeRequest((IgfsHandshakeRequest)msg);
 
             case STATUS:
                 return processStatusRequest();
@@ -195,7 +195,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException In case of handshake failure.
      */
-    private GridGgfsMessage processHandshakeRequest(GridGgfsHandshakeRequest req) throws IgniteCheckedException {
+    private IgfsMessage processHandshakeRequest(IgfsHandshakeRequest req) throws IgniteCheckedException {
         if (!F.eq(ctx.gridName(), req.gridName()))
             throw new IgniteCheckedException("Failed to perform handshake because actual Grid name differs from expected " +
                 "[expected=" + req.gridName() + ", actual=" + ctx.gridName() + ']');
@@ -204,7 +204,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
             throw new IgniteCheckedException("Failed to perform handshake because actual GGFS name differs from expected " +
                 "[expected=" + req.ggfsName() + ", actual=" + ggfs.name() + ']');
 
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
+        IgfsControlResponse res = new IgfsControlResponse();
 
         ggfs.clientLogDirectory(req.logDirectory());
 
@@ -222,10 +222,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Status response.
      * @throws IgniteCheckedException If failed.
      */
-    private GridGgfsMessage processStatusRequest() throws IgniteCheckedException {
+    private IgfsMessage processStatusRequest() throws IgniteCheckedException {
         IgfsStatus status = ggfs.globalSpace();
 
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
+        IgfsControlResponse res = new IgfsControlResponse();
 
         res.status(status);
 
@@ -241,14 +241,14 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException If failed.
      */
-    private GridGgfsMessage processPathControlRequest(IgfsClientSession ses, GridGgfsIpcCommand cmd,
-        GridGgfsMessage msg) throws IgniteCheckedException {
-        GridGgfsPathControlRequest req = (GridGgfsPathControlRequest)msg;
+    private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+        IgfsMessage msg) throws IgniteCheckedException {
+        IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
         if (log.isDebugEnabled())
             log.debug("Processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + ']');
 
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
+        IgfsControlResponse res = new IgfsControlResponse();
 
         try {
             switch (cmd) {
@@ -396,13 +396,13 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    private GridGgfsMessage processStreamControlRequest(IgfsClientSession ses, GridGgfsIpcCommand cmd,
-        GridGgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException {
-        GridGgfsStreamControlRequest req = (GridGgfsStreamControlRequest)msg;
+    private IgfsMessage processStreamControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+        IgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException {
+        IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
 
         Long rsrcId = req.streamId();
 
-        GridGgfsControlResponse resp = new GridGgfsControlResponse();
+        IgfsControlResponse resp = new IgfsControlResponse();
 
         switch (cmd) {
             case CLOSE: {
@@ -513,7 +513,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Affinity key that maps on local node by the time this method is called if replication factor
      *      is {@code 0}, {@code null} otherwise.
      */
-    @Nullable private IgniteUuid affinityKey(GridGgfsPathControlRequest req) {
+    @Nullable private IgniteUuid affinityKey(IgfsPathControlRequest req) {
         // Do not generate affinity key for replicated or near-only cache.
         if (!req.colocate()) {
             if (log.isDebugEnabled())


Mime
View raw message