ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/76] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 18:03:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
new file mode 100644
index 0000000..7ff573a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.ipc.loopback.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.thread.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.spi.IgnitePortProtocol.*;
+
+/**
+ * GGFS server. Handles requests passed from GGFS clients.
+ */
+public class IgfsServer {
+    /** GGFS context. */
+    private final IgfsContext ggfsCtx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** GGFS marshaller. */
+    private final IgfsMarshaller marsh;
+
+    /** Endpoint configuration. */
+    private final Map<String,String> endpointCfg;
+
+    /** Server endpoint. */
+    private IpcServerEndpoint srvEndpoint;
+
+    /** Server message handler. */
+    private IgfsServerHandler hnd;
+
+    /** Accept worker. */
+    private AcceptWorker acceptWorker;
+
+    /** Started client workers. */
+    private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>();
+
+    /** Flag indicating if this a management endpoint. */
+    private final boolean mgmt;
+
+    /**
+     * Constructs ggfs server manager.
+     * @param ggfsCtx GGFS context.
+     * @param endpointCfg Endpoint configuration to start.
+     * @param mgmt Management flag - if true, server is intended to be started for Visor.
+     */
+    public IgfsServer(IgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) {
+        assert ggfsCtx != null;
+        assert endpointCfg != null;
+
+        this.endpointCfg = endpointCfg;
+        this.ggfsCtx = ggfsCtx;
+        this.mgmt = mgmt;
+
+        log = ggfsCtx.kernalContext().log(IgfsServer.class);
+
+        marsh = new IgfsMarshaller();
+    }
+
+    /**
+     * Starts this server.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start() throws IgniteCheckedException {
+        srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg);
+
+        if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint)
+            throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() +
+                " should not be configured on Windows (configure " +
+                IpcServerTcpEndpoint.class.getSimpleName() + ")");
+
+        if (srvEndpoint instanceof IpcServerTcpEndpoint) {
+            IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint;
+
+            srvEndpoint0.setManagement(mgmt);
+
+            if (srvEndpoint0.getHost() == null) {
+                if (mgmt) {
+                    String locHostName = ggfsCtx.kernalContext().config().getLocalHost();
+
+                    try {
+                        srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress());
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e);
+                    }
+                }
+                else
+                    // Bind non-management endpoint to 127.0.0.1 by default.
+                    srvEndpoint0.setHost("127.0.0.1");
+            }
+        }
+
+        ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint);
+
+        srvEndpoint.start();
+
+        // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
+        if (srvEndpoint.getPort() >= 0)
+            ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
+
+        hnd = new IgfsIpcHandler(ggfsCtx);
+
+        // Start client accept worker.
+        acceptWorker = new AcceptWorker();
+    }
+
+    /**
+     * Callback that is invoked when kernal is ready.
+     */
+    public void onKernalStart() {
+        // Accept connections only when grid is ready.
+        if (srvEndpoint != null)
+            new IgniteThread(acceptWorker).start();
+    }
+
+    /**
+     * Stops this server.
+     *
+     * @param cancel Cancel flag.
+     */
+    public void stop(boolean cancel) {
+        // Skip if did not start.
+        if (srvEndpoint == null)
+            return;
+
+        // Stop accepting new client connections.
+        U.cancel(acceptWorker);
+
+        U.join(acceptWorker, log);
+
+        // Stop server handler, no more requests on existing connections will be processed.
+        try {
+            hnd.stop();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e);
+        }
+
+        // Stop existing client connections.
+        for (ClientWorker worker : clientWorkers)
+            U.cancel(worker);
+
+        U.join(clientWorkers, log);
+
+        // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered.
+        if (srvEndpoint.getPort() >= 0)
+            ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
+
+        try {
+            ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to cleanup server endpoint.", e);
+        }
+    }
+
+    /**
+     * Gets IPC server endpoint.
+     *
+     * @return IPC server endpoint.
+     */
+    public IpcServerEndpoint getIpcServerEndpoint() {
+        return srvEndpoint;
+    }
+
+    /**
+     * Client reader thread.
+     */
+    private class ClientWorker extends GridWorker {
+        /** Connected client endpoint. */
+        private IpcEndpoint endpoint;
+
+        /** Data output stream. */
+        private final IgfsDataOutputStream out;
+
+        /** Client session object. */
+        private IgfsClientSession ses;
+
+        /** Queue node for fast unlink. */
+        private ConcurrentLinkedDeque8.Node<ClientWorker> node;
+
+        /**
+         * Creates client worker.
+         *
+         * @param idx Worker index for worker thread naming.
+         * @param endpoint Connected client endpoint.
+         * @throws IgniteCheckedException If endpoint output stream cannot be obtained.
+         */
+        protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException {
+            super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log);
+
+            this.endpoint = endpoint;
+
+            ses = new IgfsClientSession();
+
+            out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            try {
+                IgfsDataInputStream dis = new IgfsDataInputStream(endpoint.inputStream());
+
+                byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
+
+                boolean first = true;
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    dis.readFully(hdr);
+
+                    final long reqId = U.bytesToLong(hdr, 0);
+
+                    int ordinal = U.bytesToInt(hdr, 8);
+
+                    if (first) { // First message must be HANDSHAKE.
+                        if (reqId != 0 || ordinal != IgfsIpcCommand.HANDSHAKE.ordinal()) {
+                            U.warn(log, "Handshake failed.");
+
+                            return;
+                        }
+
+                        first = false;
+                    }
+
+                    final IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(ordinal);
+
+                    IgfsMessage msg = marsh.unmarshall(cmd, hdr, dis);
+
+                    IgniteInternalFuture<IgfsMessage> fut = hnd.handleAsync(ses, msg, dis);
+
+                    // If fut is null, no response is required.
+                    if (fut != null) {
+                        if (fut.isDone()) {
+                            IgfsMessage res;
+
+                            try {
+                                res = fut.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                res = new IgfsControlResponse();
+
+                                ((IgfsControlResponse)res).error(e);
+                            }
+
+                            try {
+                                synchronized (out) {
+                                    // Reuse header.
+                                    IgfsMarshaller.fillHeader(hdr, reqId, res.command());
+
+                                    marsh.marshall(res, hdr, out);
+
+                                    out.flush();
+                                }
+                            }
+                            catch (IOException | IgniteCheckedException e) {
+                                shutdown0(e);
+                            }
+                        }
+                        else {
+                            fut.listenAsync(new CIX1<IgniteInternalFuture<IgfsMessage>>() {
+                                @Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) {
+                                    IgfsMessage res;
+
+                                    try {
+                                        res = fut.get();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        res = new IgfsControlResponse();
+
+                                        ((IgfsControlResponse)res).error(e);
+                                    }
+
+                                    try {
+                                        synchronized (out) {
+                                            byte[] hdr = IgfsMarshaller.createHeader(reqId, res.command());
+
+                                            marsh.marshall(res, hdr, out);
+
+                                            out.flush();
+                                        }
+                                    }
+                                    catch (IOException | IgniteCheckedException e) {
+                                        shutdown0(e);
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+            }
+            catch (EOFException ignored) {
+                // Client closed connection.
+            }
+            catch (IgniteCheckedException | IOException e) {
+                if (!isCancelled())
+                    U.error(log, "Failed to read data from client (will close connection)", e);
+            }
+            finally {
+                onFinished();
+            }
+        }
+
+        /**
+         * @param node Node in queue for this worker.
+         */
+        public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) {
+            this.node = node;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            shutdown0(null);
+        }
+
+        /**
+         * @param e Optional exception occurred while stopping this
+         */
+        private void shutdown0(@Nullable Throwable e) {
+            if (!isCancelled()) {
+                if (e != null)
+                    U.error(log, "Stopping client reader due to exception: " + endpoint, e);
+            }
+
+            U.closeQuiet(out);
+
+            endpoint.close();
+        }
+
+        /**
+         * Final resource cleanup.
+         */
+        private void onFinished() {
+            // Second close is no-op, if closed manually.
+            U.closeQuiet(out);
+
+            endpoint.close();
+
+            // Finally, remove from queue.
+            if (clientWorkers.unlinkx(node))
+                hnd.onClosed(ses);
+        }
+    }
+
+    /**
+     * Accept worker.
+     */
+    private class AcceptWorker extends GridWorker {
+        /** Accept index. */
+        private int acceptCnt;
+
+        /**
+         * Creates accept worker.
+         */
+        protected AcceptWorker() {
+            super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    IpcEndpoint client = srvEndpoint.accept();
+
+                    if (log.isDebugEnabled())
+                        log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() +
+                            ", client=" + client + ']');
+
+                    ClientWorker worker = new ClientWorker(client, acceptCnt++);
+
+                    IgniteThread workerThread = new IgniteThread(worker);
+
+                    ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker);
+
+                    worker.node(node);
+
+                    workerThread.start();
+                }
+            }
+            catch (IgniteCheckedException e) {
+                if (!isCancelled())
+                    U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e);
+            }
+            finally {
+                srvEndpoint.close();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            srvEndpoint.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java
new file mode 100644
index 0000000..f3870ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * GGFS server message handler. Server component that is plugged in to the server implementation
+ * to handle incoming messages asynchronously.
+ */
+public interface IgfsServerHandler {
+    /**
+     * Asynchronously handles incoming message.
+     *
+     * @param ses Client session.
+     * @param msg Message to process.
+     * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message.
+     * @return Future that will be completed when response is ready or {@code null} if no
+     *      response is required.
+     */
+    @Nullable public IgniteInternalFuture<IgfsMessage> handleAsync(IgfsClientSession ses,
+        IgfsMessage msg, DataInput in);
+
+    /**
+     * Handles handles client close events.
+     *
+     * @param ses Session that was closed.
+     */
+    public void onClosed(IgfsClientSession ses);
+
+    /**
+     * Stops handling of incoming requests. No server commands will be handled anymore.
+     *
+     * @throws IgniteCheckedException If error occurred.
+     */
+    public void stop() throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
new file mode 100644
index 0000000..cf99401
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+
+/**
+ * GGFS server manager.
+ */
+public class IgfsServerManager extends IgfsManager {
+    /** IPC server rebind interval. */
+    private static final long REBIND_INTERVAL = 3000;
+
+    /** Collection of servers to maintain. */
+    private Collection<IgfsServer> srvrs;
+
+    /** Server port binders. */
+    private BindWorker bindWorker;
+
+    /** Kernal start latch. */
+    private CountDownLatch kernalStartLatch = new CountDownLatch(1);
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        IgfsConfiguration ggfsCfg = igfsCtx.configuration();
+        Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration();
+
+        if (F.isEmpty(cfg)) {
+            // Set default configuration.
+            cfg = new HashMap<>();
+
+            cfg.put("type", U.isWindows() ? "tcp" : "shmem");
+            cfg.put("port", String.valueOf(DFLT_IPC_PORT));
+        }
+
+        if (ggfsCfg.isIpcEndpointEnabled())
+            bind(cfg, /*management*/false);
+
+        if (ggfsCfg.getManagementPort() >= 0) {
+            cfg = new HashMap<>();
+
+            cfg.put("type", "tcp");
+            cfg.put("port", String.valueOf(ggfsCfg.getManagementPort()));
+
+            bind(cfg, /*management*/true);
+        }
+
+        if (bindWorker != null)
+            new IgniteThread(bindWorker).start();
+    }
+
+    /**
+     * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread
+     * that will try to periodically start this endpoint.
+     *
+     * @param endpointCfg Endpoint configuration to start.
+     * @param mgmt {@code True} if endpoint is management.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException {
+        if (srvrs == null)
+            srvrs = new ConcurrentLinkedQueue<>();
+
+        IgfsServer ipcSrv = new IgfsServer(igfsCtx, endpointCfg, mgmt);
+
+        try {
+            ipcSrv.start();
+
+            srvrs.add(ipcSrv);
+        }
+        catch (IpcEndpointBindException ignored) {
+            int port = ipcSrv.getIpcServerEndpoint().getPort();
+
+            String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : "";
+
+            U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " +
+                "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." +
+                portMsg);
+
+            if (bindWorker == null)
+                bindWorker = new BindWorker();
+
+            bindWorker.addConfiguration(endpointCfg, mgmt);
+        }
+    }
+
+    /**
+     * @return Collection of active endpoints.
+     */
+    public Collection<IpcServerEndpoint> endpoints() {
+        return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() {
+            @Override public IpcServerEndpoint apply(IgfsServer e) {
+                return e.getIpcServerEndpoint();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        if (!F.isEmpty(srvrs)) {
+            for (IgfsServer srv : srvrs)
+                srv.onKernalStart();
+        }
+
+        kernalStartLatch.countDown();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        // Safety.
+        kernalStartLatch.countDown();
+
+        if (bindWorker != null) {
+            bindWorker.cancel();
+
+            U.join(bindWorker, log);
+        }
+
+        if (!F.isEmpty(srvrs)) {
+            for (IgfsServer srv : srvrs)
+                srv.stop(cancel);
+        }
+    }
+
+    /**
+     * Bind worker.
+     */
+    @SuppressWarnings("BusyWait")
+    private class BindWorker extends GridWorker {
+        /** Configurations to bind. */
+        private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>();
+
+        /**
+         * Constructor.
+         */
+        private BindWorker() {
+            super(igfsCtx.kernalContext().gridName(), "bind-worker", igfsCtx.kernalContext().log());
+        }
+
+        /**
+         * Adds configuration to bind on. Should not be called after thread start.
+         *
+         * @param cfg Configuration.
+         * @param mgmt Management flag.
+         */
+        public void addConfiguration(Map<String, String> cfg, boolean mgmt) {
+            bindCfgs.add(F.t(cfg, mgmt));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            kernalStartLatch.await();
+
+            while (!isCancelled()) {
+                Thread.sleep(REBIND_INTERVAL);
+
+                Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator();
+
+                while (it.hasNext()) {
+                    IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next();
+
+                    IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2());
+
+                    try {
+                        ipcSrv.start();
+
+                        ipcSrv.onKernalStart();
+
+                        srvrs.add(ipcSrv);
+
+                        it.remove();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (GridWorker.log.isDebugEnabled())
+                            GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']');
+                    }
+                }
+
+                if (bindCfgs.isEmpty())
+                    break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java
new file mode 100644
index 0000000..65ce0ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import java.io.*;
+
+/**
+ * GGFS response for status request.
+ */
+public class IgfsStatus implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Total space size. */
+    private long spaceTotal;
+
+    /** Used space in GGFS. */
+    private long spaceUsed;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsStatus() {
+        // No-op.
+    }
+
+    /**
+     * @param spaceUsed Used space in GGFS.
+     * @param spaceTotal Total space available in GGFS.
+     */
+    public IgfsStatus(long spaceUsed, long spaceTotal) {
+        this.spaceUsed = spaceUsed;
+        this.spaceTotal = spaceTotal;
+    }
+
+    /**
+     * @return Total space available in GGFS.
+     */
+    public long spaceTotal() {
+        return spaceTotal;
+    }
+
+    /**
+     * @return Used space in GGFS.
+     */
+    public long spaceUsed() {
+        return spaceUsed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(spaceUsed);
+        out.writeLong(spaceTotal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        spaceUsed = in.readLong();
+        spaceTotal = in.readLong();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
new file mode 100644
index 0000000..9f45205
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Basic sync message.
+ */
+public class IgfsSyncMessage extends IgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Coordinator node order. */
+    private long order;
+
+    /** Response flag. */
+    private boolean res;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsSyncMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param order Node order.
+     * @param res Response flag.
+     */
+    public IgfsSyncMessage(long order, boolean res) {
+        this.order = order;
+        this.res = res;
+    }
+
+    /**
+     * @return Coordinator node order.
+     */
+    public long order() {
+        return order;
+    }
+
+    /**
+     * @return {@code True} if response message.
+     */
+    public boolean response() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsSyncMessage.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public MessageAdapter clone() {
+        IgfsSyncMessage _clone = new IgfsSyncMessage();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(MessageAdapter _msg) {
+        super.clone0(_msg);
+
+        IgfsSyncMessage _clone = (IgfsSyncMessage)_msg;
+
+        _clone.order = order;
+        _clone.res = res;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!typeWritten) {
+            if (!writer.writeByte(null, directType()))
+                return false;
+
+            typeWritten = true;
+        }
+
+        switch (state) {
+            case 0:
+                if (!writer.writeLong("order", order))
+                    return false;
+
+                state++;
+
+            case 1:
+                if (!writer.writeBoolean("res", res))
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        reader.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (state) {
+            case 0:
+                order = reader.readLong("order");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 1:
+                res = reader.readBoolean("res");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 71;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java
new file mode 100644
index 0000000..222fed5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * GGFS task arguments implementation.
+ */
+public class IgfsTaskArgsImpl<T> implements IgfsTaskArgs<T>,  Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** Paths. */
+    private Collection<IgfsPath> paths;
+
+    /** Record resolver. */
+    private IgfsRecordResolver recRslvr;
+
+    /** Skip non existent files flag. */
+    private boolean skipNonExistentFiles;
+
+    /** Maximum range length. */
+    private long maxRangeLen;
+
+    /** User argument. */
+    private T usrArg;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public IgfsTaskArgsImpl() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ggfsName GGFS name.
+     * @param paths Paths.
+     * @param recRslvr Record resolver.
+     * @param skipNonExistentFiles Skip non existent files flag.
+     * @param maxRangeLen Maximum range length.
+     * @param usrArg User argument.
+     */
+    public IgfsTaskArgsImpl(String ggfsName, Collection<IgfsPath> paths, IgfsRecordResolver recRslvr,
+        boolean skipNonExistentFiles, long maxRangeLen, T usrArg) {
+        this.ggfsName = ggfsName;
+        this.paths = paths;
+        this.recRslvr = recRslvr;
+        this.skipNonExistentFiles = skipNonExistentFiles;
+        this.maxRangeLen = maxRangeLen;
+        this.usrArg = usrArg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String ggfsName() {
+        return ggfsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> paths() {
+        return paths;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsRecordResolver recordResolver() {
+        return recRslvr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipNonExistentFiles() {
+        return skipNonExistentFiles;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maxRangeLength() {
+        return maxRangeLen;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T userArgument() {
+        return usrArg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsTaskArgsImpl.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ggfsName);
+        U.writeCollection(out, paths);
+
+        out.writeObject(recRslvr);
+        out.writeBoolean(skipNonExistentFiles);
+        out.writeLong(maxRangeLen);
+        out.writeObject(usrArg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ggfsName = U.readString(in);
+        paths = U.readCollection(in);
+
+        recRslvr = (IgfsRecordResolver)in.readObject();
+        skipNonExistentFiles = in.readBoolean();
+        maxRangeLen = in.readLong();
+        usrArg = (T)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java
new file mode 100644
index 0000000..533277f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+
+/**
+ * GGFS ad-hoc thread.
+ */
+public abstract class IgfsThread extends Thread {
+    /**
+     * Creates {@code GGFS} add-hoc thread.
+     */
+    protected IgfsThread() {
+        super("ggfs-worker");
+    }
+
+    /**
+     * Creates {@code GGFS} add-hoc thread.
+     *
+     * @param name Thread name.
+     */
+    protected IgfsThread(String name) {
+        super(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void run() {
+        try {
+            body();
+        }
+        catch (InterruptedException ignore) {
+            interrupt();
+        }
+        // Catch all.
+        catch (Throwable e) {
+            X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage());
+
+            e.printStackTrace();
+        }
+        finally {
+            try {
+                cleanup();
+            }
+            // Catch all.
+            catch (Throwable e) {
+                X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage());
+
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * Thread body.
+     *
+     * @throws InterruptedException If interrupted.
+     */
+    protected abstract void body() throws InterruptedException;
+
+    /**
+     * Cleanup.
+     */
+    protected void cleanup() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html
new file mode 100644
index 0000000..60e49f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+    <!-- Package description. -->
+    Contains high performance file system processer.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java
index d855ead..4229988 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.visor.ggfs;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index ca8c257..f1b067f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.ipc.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index a0e4871..057fa52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -24,7 +24,7 @@ import org.apache.ignite.cache.eviction.lru.*;
 import org.apache.ignite.cache.eviction.random.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.event.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
index da54743..360e5eb 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
index 97373a9..fbaa9ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.igfs;
 
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index 9cfdd75..a8b2901 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.igfs;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java
deleted file mode 100644
index 90a8172..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.eviction.ignitefs.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.igfs.IgfsMode.*;
-
-/**
- * Tests for GGFS per-block LR eviction policy.
- */
-@SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
-public class GridCacheIgfsPerBlockLruEvictionPolicySelfTest extends IgfsCommonAbstractTest {
-    /** Primary GGFS name. */
-    private static final String GGFS_PRIMARY = "ggfs-primary";
-
-    /** Primary GGFS name. */
-    private static final String GGFS_SECONDARY = "ggfs-secondary";
-
-    /** Secondary file system REST endpoint configuration map. */
-    private static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{
-        put("type", "tcp");
-        put("port", "11500");
-    }};
-
-    /** File working in PRIMARY mode. */
-    public static final IgfsPath FILE = new IgfsPath("/file");
-
-    /** File working in DUAL mode. */
-    public static final IgfsPath FILE_RMT = new IgfsPath("/fileRemote");
-
-    /** Primary GGFS instances. */
-    private static IgfsImpl ggfsPrimary;
-
-    /** Secondary GGFS instance. */
-    private static IgniteFs secondaryFs;
-
-    /** Primary file system data cache. */
-    private static GridCacheAdapter<IgfsBlockKey, byte[]> dataCache;
-
-    /** Eviction policy */
-    private static CacheIgfsPerBlockLruEvictionPolicy evictPlc;
-
-    /**
-     * Start a grid with the primary file system.
-     *
-     * @throws Exception If failed.
-     */
-    private void startPrimary() throws Exception {
-        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
-
-        ggfsCfg.setDataCacheName("dataCache");
-        ggfsCfg.setMetaCacheName("metaCache");
-        ggfsCfg.setName(GGFS_PRIMARY);
-        ggfsCfg.setBlockSize(512);
-        ggfsCfg.setDefaultMode(PRIMARY);
-        ggfsCfg.setPrefetchBlocks(1);
-        ggfsCfg.setSequentialReadsBeforePrefetch(Integer.MAX_VALUE);
-        ggfsCfg.setSecondaryFileSystem(secondaryFs);
-
-        Map<String, IgfsMode> pathModes = new HashMap<>();
-
-        pathModes.put(FILE_RMT.toString(), DUAL_SYNC);
-
-        ggfsCfg.setPathModes(pathModes);
-
-        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
-        dataCacheCfg.setName("dataCache");
-        dataCacheCfg.setCacheMode(PARTITIONED);
-        dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
-        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        evictPlc = new CacheIgfsPerBlockLruEvictionPolicy();
-
-        dataCacheCfg.setEvictionPolicy(evictPlc);
-        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
-        dataCacheCfg.setBackups(0);
-        dataCacheCfg.setQueryIndexEnabled(false);
-
-        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
-        metaCacheCfg.setName("metaCache");
-        metaCacheCfg.setCacheMode(REPLICATED);
-        metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
-        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        metaCacheCfg.setQueryIndexEnabled(false);
-        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setGridName("grid-primary");
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        cfg.setDiscoverySpi(discoSpi);
-        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
-        cfg.setGgfsConfiguration(ggfsCfg);
-
-        cfg.setLocalHost("127.0.0.1");
-        cfg.setConnectorConfiguration(null);
-
-        Ignite g = G.start(cfg);
-
-        ggfsPrimary = (IgfsImpl)g.fileSystem(GGFS_PRIMARY);
-
-        dataCache = ggfsPrimary.context().kernalContext().cache().internalCache(
-            ggfsPrimary.context().configuration().getDataCacheName());
-    }
-
-    /**
-     * Start a grid with the secondary file system.
-     *
-     * @throws Exception If failed.
-     */
-    private void startSecondary() throws Exception {
-        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
-
-        ggfsCfg.setDataCacheName("dataCache");
-        ggfsCfg.setMetaCacheName("metaCache");
-        ggfsCfg.setName(GGFS_SECONDARY);
-        ggfsCfg.setBlockSize(512);
-        ggfsCfg.setDefaultMode(PRIMARY);
-        ggfsCfg.setIpcEndpointConfiguration(SECONDARY_REST_CFG);
-
-        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
-        dataCacheCfg.setName("dataCache");
-        dataCacheCfg.setCacheMode(PARTITIONED);
-        dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
-        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
-        dataCacheCfg.setBackups(0);
-        dataCacheCfg.setQueryIndexEnabled(false);
-        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
-        metaCacheCfg.setName("metaCache");
-        metaCacheCfg.setCacheMode(REPLICATED);
-        metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
-        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        metaCacheCfg.setQueryIndexEnabled(false);
-        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setGridName("grid-secondary");
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        cfg.setDiscoverySpi(discoSpi);
-        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
-        cfg.setGgfsConfiguration(ggfsCfg);
-
-        cfg.setLocalHost("127.0.0.1");
-        cfg.setConnectorConfiguration(null);
-
-        Ignite g = G.start(cfg);
-
-        secondaryFs = g.fileSystem(GGFS_SECONDARY);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        try {
-            // Cleanup.
-            ggfsPrimary.format();
-
-            while (!dataCache.isEmpty())
-                U.sleep(100);
-
-            checkEvictionPolicy(0, 0);
-        }
-        finally {
-            stopAllGrids(false);
-        }
-    }
-
-    /**
-     * Startup primary and secondary file systems.
-     *
-     * @throws Exception If failed.
-     */
-    private void start() throws Exception {
-        startSecondary();
-        startPrimary();
-
-        evictPlc.setMaxBlocks(0);
-        evictPlc.setMaxSize(0);
-        evictPlc.setExcludePaths(null);
-    }
-
-    /**
-     * Test how evictions are handled for a file working in PRIMARY mode.
-     *
-     * @throws Exception If failed.
-     */
-    public void testFilePrimary() throws Exception {
-        start();
-
-        // Create file in primary mode. It must not be propagated to eviction policy.
-        ggfsPrimary.create(FILE, true).close();
-
-        checkEvictionPolicy(0, 0);
-
-        int blockSize = ggfsPrimary.info(FILE).blockSize();
-
-        append(FILE, blockSize);
-
-        checkEvictionPolicy(0, 0);
-
-        read(FILE, 0, blockSize);
-
-        checkEvictionPolicy(0, 0);
-    }
-
-    /**
-     * Test how evictions are handled for a file working in PRIMARY mode.
-     *
-     * @throws Exception If failed.
-     */
-    public void testFileDual() throws Exception {
-        start();
-
-        ggfsPrimary.create(FILE_RMT, true).close();
-
-        checkEvictionPolicy(0, 0);
-
-        int blockSize = ggfsPrimary.info(FILE_RMT).blockSize();
-
-        // File write.
-        append(FILE_RMT, blockSize);
-
-        checkEvictionPolicy(1, blockSize);
-
-        // One more write.
-        append(FILE_RMT, blockSize);
-
-        checkEvictionPolicy(2, blockSize * 2);
-
-        // Read.
-        read(FILE_RMT, 0, blockSize);
-
-        checkEvictionPolicy(2, blockSize * 2);
-    }
-
-    /**
-     * Ensure that a DUAL mode file is not propagated to eviction policy
-     *
-     * @throws Exception If failed.
-     */
-    public void testFileDualExclusion() throws Exception {
-        start();
-
-        evictPlc.setExcludePaths(Collections.singleton(FILE_RMT.toString()));
-
-        // Create file in primary mode. It must not be propagated to eviction policy.
-        ggfsPrimary.create(FILE_RMT, true).close();
-
-        checkEvictionPolicy(0, 0);
-
-        int blockSize = ggfsPrimary.info(FILE_RMT).blockSize();
-
-        append(FILE_RMT, blockSize);
-
-        checkEvictionPolicy(0, 0);
-
-        read(FILE_RMT, 0, blockSize);
-
-        checkEvictionPolicy(0, 0);
-    }
-
-    /**
-     * Ensure that exception is thrown in case we are trying to rename file with one exclude setting to the file with
-     * another.
-     *
-     * @throws Exception If failed.
-     */
-    public void testRenameDifferentExcludeSettings() throws Exception {
-        start();
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                ggfsPrimary.rename(FILE, FILE_RMT);
-
-                return null;
-            }
-        }, IgfsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " +
-            "(need to copy and remove)");
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                ggfsPrimary.rename(FILE_RMT, FILE);
-
-                return null;
-            }
-        }, IgfsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " +
-            "(need to copy and remove)");
-    }
-
-    /**
-     * Test eviction caused by too much blocks.
-     *
-     * @throws Exception If failed.
-     */
-    public void testBlockCountEviction() throws Exception {
-        start();
-
-        int blockCnt = 3;
-
-        evictPlc.setMaxBlocks(blockCnt);
-
-        ggfsPrimary.create(FILE_RMT, true).close();
-
-        checkEvictionPolicy(0, 0);
-
-        int blockSize = ggfsPrimary.info(FILE_RMT).blockSize();
-
-        // Write blocks up to the limit.
-        append(FILE_RMT, blockSize * blockCnt);
-
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Write one more block what should cause eviction.
-        append(FILE_RMT, blockSize);
-
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Read the first block.
-        read(FILE_RMT, 0, blockSize);
-
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-        checkMetrics(1, 1);
-    }
-
-    /**
-     * Test eviction caused by too big data size.
-     *
-     * @throws Exception If failed.
-     */
-    public void testDataSizeEviction() throws Exception {
-        start();
-
-        ggfsPrimary.create(FILE_RMT, true).close();
-
-        int blockCnt = 3;
-        int blockSize = ggfsPrimary.info(FILE_RMT).blockSize();
-
-        evictPlc.setMaxSize(blockSize * blockCnt);
-
-        // Write blocks up to the limit.
-        append(FILE_RMT, blockSize * blockCnt);
-
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Reset metrics.
-        ggfsPrimary.resetMetrics();
-
-        // Read the first block what should cause reordering.
-        read(FILE_RMT, 0, blockSize);
-
-        checkMetrics(1, 0);
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Write one more block what should cause eviction of the block 2.
-        append(FILE_RMT, blockSize);
-
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Read the first block.
-        read(FILE_RMT, 0, blockSize);
-
-        checkMetrics(2, 0);
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-
-        // Read the second block (which was evicted).
-        read(FILE_RMT, blockSize, blockSize);
-
-        checkMetrics(3, 1);
-        checkEvictionPolicy(blockCnt, blockCnt * blockSize);
-    }
-
-    /**
-     * Read some data from the given file with the given offset.
-     *
-     * @param path File path.
-     * @param off Offset.
-     * @param len Length.
-     * @throws Exception If failed.
-     */
-    private void read(IgfsPath path, int off, int len) throws Exception {
-        IgfsInputStream is = ggfsPrimary.open(path);
-
-        is.readFully(off, new byte[len]);
-
-        is.close();
-    }
-
-    /**
-     * Append some data to the given file.
-     *
-     * @param path File path.
-     * @param len Data length.
-     * @throws Exception If failed.
-     */
-    private void append(IgfsPath path, int len) throws Exception {
-        IgfsOutputStream os = ggfsPrimary.append(path, false);
-
-        os.write(new byte[len]);
-
-        os.close();
-    }
-
-    /**
-     * Check metrics counters.
-     *
-     * @param blocksRead Expected blocks read.
-     * @param blocksReadRmt Expected blocks read remote.
-     * @throws Exception If failed.
-     */
-    public void checkMetrics(final long blocksRead, final long blocksReadRmt) throws Exception {
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                IgfsMetrics metrics = ggfsPrimary.metrics();
-
-                return metrics.blocksReadTotal() == blocksRead && metrics.blocksReadRemote() == blocksReadRmt;
-            }
-        }, 5000) : "Unexpected metrics [expectedBlocksReadTotal=" + blocksRead + ", actualBlocksReadTotal=" +
-            ggfsPrimary.metrics().blocksReadTotal() + ", expectedBlocksReadRemote=" + blocksReadRmt +
-            ", actualBlocksReadRemote=" + ggfsPrimary.metrics().blocksReadRemote() + ']';
-    }
-
-    /**
-     * Check eviction policy state.
-     *
-     * @param curBlocks Current blocks.
-     * @param curBytes Current bytes.
-     */
-    private void checkEvictionPolicy(final int curBlocks, final long curBytes) throws IgniteInterruptedCheckedException {
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return evictPlc.getCurrentBlocks() == curBlocks && evictPlc.getCurrentSize() == curBytes;
-            }
-        }, 5000) : "Unexpected counts [expectedBlocks=" + curBlocks + ", actualBlocks=" + evictPlc.getCurrentBlocks() +
-            ", expectedBytes=" + curBytes + ", currentBytes=" + curBytes + ']';
-    }
-}


Mime
View raw message