ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [24/50] [abbrv] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/sprint-1' into ignite-67
Date Sat, 31 Jan 2015 03:55:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 0000000,d22f2c9..679fec1
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@@ -1,0 -1,707 +1,707 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.ipc.shmem;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.thread.*;
+ import org.apache.ignite.internal.processors.resource.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.ipc.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.nio.channels.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ /**
+  * Server shared memory IPC endpoint.
+  */
+ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint {
+     /** Troubleshooting public wiki page. */
+     public static final String TROUBLESHOOTING_URL = "http://bit.ly/GridGain-Troubleshooting";
+ 
+     /** IPC error message. */
+     public static final String OUT_OF_RESOURCES_MSG = "Failed to allocate shared memory
segment " +
+         "(for troubleshooting see " + TROUBLESHOOTING_URL + ')';
+ 
+     /** Default endpoint port number. */
+     public static final int DFLT_IPC_PORT = 10500;
+ 
+     /** Default shared memory space in bytes. */
+     public static final int DFLT_SPACE_SIZE = 256 * 1024;
+ 
+     /**
 -     * Default token directory. Note that this path is relative to {@code GRIDGAIN_HOME/work}
folder
 -     * if {@code GRIDGAIN_HOME} system or environment variable specified, otherwise it is
relative to
++     * Default token directory. Note that this path is relative to {@code IGNITE_HOME/work}
folder
++     * if {@code IGNITE_HOME} system or environment variable specified, otherwise it is
relative to
+      * {@code work} folder under system {@code java.io.tmpdir} folder.
+      *
+      * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory()
+      */
+     public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem";
+ 
+     /**
+      * Shared memory token file name prefix.
+      *
+      * Token files are created and stored in the following manner: [tokDirPath]/[nodeId]-[current
+      * PID]/gg-shmem-space-[auto_idx]-[other_party_pid]-[size]
+      */
+     public static final String TOKEN_FILE_NAME = "gg-shmem-space-";
+ 
+     /** Default lock file name. */
+     private static final String LOCK_FILE_NAME = "lock.file";
+ 
+     /** GC frequency. */
+     private static final long GC_FREQ = 10000;
+ 
+     /** ID generator. */
+     private static final AtomicLong tokIdxGen = new AtomicLong();
+ 
+     /** Port to bind socket to. */
+     private int port = DFLT_IPC_PORT;
+ 
+     /** Prefix. */
+     private String tokDirPath = DFLT_TOKEN_DIR_PATH;
+ 
+     /** Space size. */
+     private int size = DFLT_SPACE_SIZE;
+ 
+     /** Server socket. */
+     @GridToStringExclude
+     private ServerSocket srvSock;
+ 
+     /** Token directory. */
+     private File tokDir;
+ 
+     /** Logger. */
+     @IgniteLoggerResource
+     private IgniteLogger log;
+ 
+     /** Local node ID. */
+     private UUID locNodeId;
+ 
+     /** Grid name. */
+     private String gridName;
+ 
+     /** Flag allowing not to print out of resources warning. */
+     private boolean omitOutOfResourcesWarn;
+ 
+     /** GC worker. */
+     private GridWorker gcWorker;
+ 
+     /** Pid of the current process. */
+     private int pid;
+ 
+     /** Closed flag. */
+     private volatile boolean closed;
+ 
+     /** Spaces opened on with this endpoint. */
+     private final Collection<IpcSharedMemoryClientEndpoint> endpoints =
+         new GridConcurrentHashSet<>();
+ 
+     /** Use this constructor when dependencies could be injected with {@link GridResourceProcessor#injectGeneric(Object)}.
*/
+     public IpcSharedMemoryServerEndpoint() {
+         // No-op.
+     }
+ 
+     /**
+      * Constructor to set dependencies explicitly.
+      *
+      * @param log Log.
+      * @param locNodeId Node id.
+      * @param gridName Grid name.
+      */
+     public IpcSharedMemoryServerEndpoint(IgniteLogger log, UUID locNodeId, String gridName)
{
+         this.log = log;
+         this.locNodeId = locNodeId;
+         this.gridName = gridName;
+     }
+ 
+     /** @param omitOutOfResourcesWarn If {@code true}, out of resources warning will not
be printed by server. */
+     public void omitOutOfResourcesWarning(boolean omitOutOfResourcesWarn) {
+         this.omitOutOfResourcesWarn = omitOutOfResourcesWarn;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void start() throws IgniteCheckedException {
+         IpcSharedMemoryNativeLoader.load();
+ 
+         pid = IpcSharedMemoryUtils.pid();
+ 
+         if (pid == -1)
+             throw new IpcEndpointBindException("Failed to get PID of the current process.");
+ 
+         if (size <= 0)
+             throw new IpcEndpointBindException("Space size should be positive: " + size);
+ 
+         String tokDirPath = this.tokDirPath;
+ 
+         if (F.isEmpty(tokDirPath))
+             throw new IpcEndpointBindException("Token directory path is empty.");
+ 
+         tokDirPath = tokDirPath + '/' + locNodeId.toString() + '-' + IpcSharedMemoryUtils.pid();
+ 
+         tokDir = U.resolveWorkDirectory(tokDirPath, false);
+ 
+         if (port <= 0 || port >= 0xffff)
+             throw new IpcEndpointBindException("Port value is illegal: " + port);
+ 
+         try {
+             srvSock = new ServerSocket();
+ 
+             // Always bind to loopback.
+             srvSock.bind(new InetSocketAddress("127.0.0.1", port));
+         }
+         catch (IOException e) {
+             // Although empty socket constructor never throws exception, close it just in
case.
+             U.closeQuiet(srvSock);
+ 
+             throw new IpcEndpointBindException("Failed to bind shared memory IPC endpoint
(is port already " +
+                 "in use?): " + port, e);
+         }
+ 
+         gcWorker = new GcWorker(gridName, "ipc-shmem-gc", log);
+ 
+         new IgniteThread(gcWorker).start();
+ 
+         if (log.isInfoEnabled())
+             log.info("IPC shared memory server endpoint started [port=" + port +
+                 ", tokDir=" + tokDir.getAbsolutePath() + ']');
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("ErrorNotRethrown")
+     @Override public IpcEndpoint accept() throws IgniteCheckedException {
+         while (!Thread.currentThread().isInterrupted()) {
+             Socket sock = null;
+ 
+             boolean accepted = false;
+ 
+             try {
+                 sock = srvSock.accept();
+ 
+                 accepted = true;
+ 
+                 InputStream inputStream = sock.getInputStream();
+                 ObjectInputStream in = new ObjectInputStream(inputStream);
+ 
+                 ObjectOutputStream out = new ObjectOutputStream(sock.getOutputStream());
+ 
+                 IpcSharedMemorySpace inSpace = null;
+ 
+                 IpcSharedMemorySpace outSpace = null;
+ 
+                 boolean err = true;
+ 
+                 try {
+                     IpcSharedMemoryInitRequest req = (IpcSharedMemoryInitRequest)in.readObject();
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Processing request: " + req);
+ 
+                     IgnitePair<String> p = inOutToken(req.pid(), size);
+ 
+                     String file1 = p.get1();
+                     String file2 = p.get2();
+ 
+                     assert file1 != null;
+                     assert file2 != null;
+ 
+                     // Create tokens.
+                     new File(file1).createNewFile();
+                     new File(file2).createNewFile();
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Created token files: " + p);
+ 
+                     inSpace = new IpcSharedMemorySpace(
+                         file1,
+                         req.pid(),
+                         pid,
+                         size,
+                         true,
+                         log);
+ 
+                     outSpace = new IpcSharedMemorySpace(
+                         file2,
+                         pid,
+                         req.pid(),
+                         size,
+                         false,
+                         log);
+ 
+                     IpcSharedMemoryClientEndpoint ret = new IpcSharedMemoryClientEndpoint(inSpace,
outSpace,
+                         log);
+ 
+                     out.writeObject(new IpcSharedMemoryInitResponse(file2, outSpace.sharedMemoryId(),
+                         file1, inSpace.sharedMemoryId(), pid, size));
+ 
+                     err = !in.readBoolean();
+ 
+                     endpoints.add(ret);
+ 
+                     return ret;
+                 }
+                 catch (UnsatisfiedLinkError e) {
+                     throw IpcSharedMemoryUtils.linkError(e);
+                 }
+                 catch (IOException e) {
+                     if (log.isDebugEnabled())
+                         log.debug("Failed to process incoming connection " +
+                             "(was connection closed by another party):" + e.getMessage());
+                 }
+                 catch (ClassNotFoundException e) {
+                     U.error(log, "Failed to process incoming connection.", e);
+                 }
+                 catch (ClassCastException e) {
+                     String msg = "Failed to process incoming connection (most probably,
shared memory " +
+                         "rest endpoint has been configured by mistake).";
+ 
+                     LT.warn(log, null, msg);
+ 
+                     sendErrorResponse(out, e);
+                 }
+                 catch (IpcOutOfSystemResourcesException e) {
+                     if (!omitOutOfResourcesWarn)
+                         LT.warn(log, null, OUT_OF_RESOURCES_MSG);
+ 
+                     sendErrorResponse(out, e);
+                 }
+                 catch (IgniteCheckedException e) {
+                     LT.error(log, e, "Failed to process incoming shared memory connection.");
+ 
+                     sendErrorResponse(out, e);
+                 }
+                 finally {
+                     // Exception has been thrown, need to free system resources.
+                     if (err) {
+                         if (inSpace != null)
+                             inSpace.forceClose();
+ 
+                         // Safety.
+                         if (outSpace != null)
+                             outSpace.forceClose();
+                     }
+                 }
+             }
+             catch (IOException e) {
+                 if (!Thread.currentThread().isInterrupted() && !accepted)
+                     throw new IgniteCheckedException("Failed to accept incoming connection.",
e);
+ 
+                 if (!closed)
+                     LT.error(log, null, "Failed to process incoming shared memory connection:
" + e.getMessage());
+             }
+             finally {
+                 U.closeQuiet(sock);
+             }
+         } // while
+ 
+         throw new IgniteInterruptedException("Socket accept was interrupted.");
+     }
+ 
+     /**
+      * Injects resources.
+      *
+      * @param ignite Ignite
+      */
+     @IgniteInstanceResource
+     private void injectResources(Ignite ignite){
+         if (ignite != null) {
+             // Inject resources.
+             gridName = ignite.name();
+             locNodeId = ignite.configuration().getNodeId();
+         }
+         else {
+             // Cleanup resources.
+             gridName = null;
+             locNodeId = null;
+         }
+     }
+ 
+     /**
+      * @param out Output stream.
+      * @param err Error cause.
+      */
+     private void sendErrorResponse(ObjectOutput out, Exception err) {
+         try {
+             out.writeObject(new IpcSharedMemoryInitResponse(err));
+         }
+         catch (IOException e) {
+             U.error(log, "Failed to send error response to client.", e);
+         }
+     }
+ 
+     /**
+      * @param pid PID of the other party.
+      * @param size Size of the space.
+      * @return Token pair.
+      */
+     private IgnitePair<String> inOutToken(int pid, int size) {
+         while (true) {
+             long idx = tokIdxGen.get();
+ 
+             if (tokIdxGen.compareAndSet(idx, idx + 2))
+                 return F.pair(new File(tokDir, TOKEN_FILE_NAME + idx + "-" + pid + "-" +
size).getAbsolutePath(),
+                     new File(tokDir, TOKEN_FILE_NAME + (idx + 1) + "-" + pid + "-" + size).getAbsolutePath());
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int getPort() {
+         return port;
+     }
+ 
+     /** {@inheritDoc} */
+     @Nullable @Override public String getHost() {
+         return null;
+     }
+ 
+     /**
+      * {@inheritDoc}
+      *
+      * @return {@code false} as shared memory endpoints can not be used for management.
+      */
+     @Override public boolean isManagement() {
+         return false;
+     }
+ 
+     /**
+      * Sets port endpoint will be bound to.
+      *
+      * @param port Port number.
+      */
+     public void setPort(int port) {
+         this.port = port;
+     }
+ 
+     /**
+      * Gets token directory path.
+      *
+      * @return Token directory path.
+      */
+     public String getTokenDirectoryPath() {
+         return tokDirPath;
+     }
+ 
+     /**
+      * Sets token directory path.
+      *
+      * @param tokDirPath Token directory path.
+      */
+     public void setTokenDirectoryPath(String tokDirPath) {
+         this.tokDirPath = tokDirPath;
+     }
+ 
+     /**
+      * Gets size of shared memory spaces that are created by the endpoint.
+      *
+      * @return Size of shared memory space.
+      */
+     public int getSize() {
+         return size;
+     }
+ 
+     /**
+      * Sets size of shared memory spaces that are created by the endpoint.
+      *
+      * @param size Size of shared memory space.
+      */
+     public void setSize(int size) {
+         this.size = size;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void close() {
+         closed = true;
+ 
+         U.closeQuiet(srvSock);
+ 
+         if (gcWorker != null) {
+             U.cancel(gcWorker);
+ 
+             // This method may be called from already interrupted thread.
+             // Need to ensure cleaning on close.
+             boolean interrupted = Thread.interrupted();
+ 
+             try {
+                 U.join(gcWorker);
+             }
+             catch (IgniteInterruptedException e) {
+                 U.warn(log, "Interrupted when stopping GC worker.", e);
+             }
+             finally {
+                 if (interrupted)
+                     Thread.currentThread().interrupt();
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(IpcSharedMemoryServerEndpoint.class, this);
+     }
+ 
+     /**
+      * Sets configuration properties from the map.
+      *
+      * @param endpointCfg Map of properties.
+      * @throws IgniteCheckedException If invalid property name or value.
+      */
+     public void setupConfiguration(Map<String, String> endpointCfg) throws IgniteCheckedException
{
+         for (Map.Entry<String,String> e : endpointCfg.entrySet()) {
+             try {
+                 switch (e.getKey()) {
+                     case "type":
+                     case "host":
+                     case "management":
+                         //Ignore these properties
+                         break;
+ 
+                     case "port":
+                         setPort(Integer.parseInt(e.getValue()));
+                         break;
+ 
+                     case "size":
+                         setSize(Integer.parseInt(e.getValue()));
+                         break;
+ 
+                     case "tokenDirectoryPath":
+                         setTokenDirectoryPath(e.getValue());
+                         break;
+ 
+                     default:
+                         throw new IgniteCheckedException("Invalid property '" + e.getKey()
+ "' of " + getClass().getSimpleName());
+                 }
+             }
+             catch (Throwable t) {
+                 if (t instanceof IgniteCheckedException)
+                     throw t;
+ 
+                 throw new IgniteCheckedException("Invalid value '" + e.getValue() + "' of
the property '" + e.getKey() + "' in " +
+                         getClass().getSimpleName(), t);
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private class GcWorker extends GridWorker {
+         /**
+          * @param gridName Grid name.
+          * @param name Name.
+          * @param log Log.
+          */
+         protected GcWorker(@Nullable String gridName, String name, IgniteLogger log) {
+             super(gridName, name, log);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void body() throws InterruptedException, IgniteInterruptedException
{
+             if (log.isDebugEnabled())
+                 log.debug("GC worker started.");
+ 
+             File workTokDir = tokDir.getParentFile();
+ 
+             assert workTokDir != null;
+ 
+             while (!isCancelled()) {
+                 U.sleep(GC_FREQ);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Starting GC iteration.");
+ 
+                 RandomAccessFile lockFile = null;
+ 
+                 FileLock lock = null;
+ 
+                 try {
+                     lockFile = new RandomAccessFile(new File(workTokDir, LOCK_FILE_NAME),
"rw");
+ 
+                     lock = lockFile.getChannel().lock();
+ 
+                     if (lock != null)
+                         processTokenDirectory(workTokDir);
+                     else if (log.isDebugEnabled())
+                         log.debug("Token directory is being processed concurrently: " +
workTokDir.getAbsolutePath());
+                 }
+                 catch (OverlappingFileLockException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Token directory is being processed concurrently: " +
workTokDir.getAbsolutePath());
+                 }
+                 catch (IOException e) {
+                     U.error(log, "Failed to process directory: " + workTokDir.getAbsolutePath(),
e);
+                 }
+                 finally {
+                     U.releaseQuiet(lock);
+                     U.closeQuiet(lockFile);
+                 }
+ 
+                 // Process spaces created by this endpoint.
+                 if (log.isDebugEnabled())
+                     log.debug("Processing local spaces.");
+ 
+                 for (IpcSharedMemoryClientEndpoint e : endpoints) {
+                     if (log.isDebugEnabled())
+                         log.debug("Processing endpoint: " + e);
+ 
+                     if (!e.checkOtherPartyAlive()) {
+                         endpoints.remove(e);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Removed endpoint: " + e);
+                     }
+                 }
+             }
+         }
+ 
+         /** @param workTokDir Token directory (common for multiple nodes). */
+         private void processTokenDirectory(File workTokDir) {
+             for (File f : workTokDir.listFiles()) {
+                 if (!f.isDirectory()) {
+                     if (!f.getName().equals(LOCK_FILE_NAME)) {
+                         if (log.isDebugEnabled())
+                             log.debug("Unexpected file: " + f.getName());
+                     }
+ 
+                     continue;
+                 }
+ 
+                 if (f.equals(tokDir)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Skipping own token directory: " + tokDir.getName());
+ 
+                     continue;
+                 }
+ 
+                 String name = f.getName();
+ 
+                 int pid;
+ 
+                 try {
+                     pid = Integer.parseInt(name.substring(name.lastIndexOf('-') + 1));
+                 }
+                 catch (NumberFormatException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Failed to parse file name: " + name);
+ 
+                     continue;
+                 }
+ 
+                 // Is process alive?
+                 if (IpcSharedMemoryUtils.alive(pid)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Skipping alive node: " + pid);
+ 
+                     continue;
+                 }
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Possibly stale token folder: " + f);
+ 
+                 // Process each token under stale token folder.
+                 File[] shmemToks = f.listFiles();
+ 
+                 if (shmemToks == null)
+                     // Although this is strange, but is reproducible sometimes on linux.
+                     return;
+ 
+                 int rmvCnt = 0;
+ 
+                 try {
+                     for (File f0 : shmemToks) {
+                         if (log.isDebugEnabled())
+                             log.debug("Processing token file: " + f0.getName());
+ 
+                         if (f0.isDirectory()) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Unexpected directory: " + f0.getName());
+                         }
+ 
+                         // Token file format: gg-shmem-space-[auto_idx]-[other_party_pid]-[size]
+                         String[] toks = f0.getName().split("-");
+ 
+                         if (toks.length != 6) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Unrecognized token file: " + f0.getName());
+ 
+                             continue;
+                         }
+ 
+                         int pid0;
+                         int size;
+ 
+                         try {
+                             pid0 = Integer.parseInt(toks[4]);
+                             size = Integer.parseInt(toks[5]);
+                         }
+                         catch (NumberFormatException ignored) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Failed to parse file name: " + name);
+ 
+                             continue;
+                         }
+ 
+                         if (IpcSharedMemoryUtils.alive(pid0)) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Skipping alive process: " + pid0);
+ 
+                             continue;
+                         }
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Possibly stale token file: " + f0);
+ 
+                         IpcSharedMemoryUtils.freeSystemResources(f0.getAbsolutePath(), size);
+ 
+                         if (f0.delete()) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Deleted file: " + f0.getName());
+ 
+                             rmvCnt++;
+                         }
+                         else if (!f0.exists()) {
+                             if (log.isDebugEnabled())
+                                 log.debug("File has been concurrently deleted: " + f0.getName());
+ 
+                             rmvCnt++;
+                         }
+                         else if (log.isDebugEnabled())
+                             log.debug("Failed to delete file: " + f0.getName());
+                     }
+                 }
+                 finally {
+                     // Assuming that no new files can appear, since
+                     if (rmvCnt == shmemToks.length) {
+                         U.delete(f);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Deleted empty token directory: " + f.getName());
+                     }
+                 }
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java
index 0000000,ba4be48..249d995
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpace.java
@@@ -1,0 -1,374 +1,374 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.ipc.shmem;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+ 
+ import static org.apache.ignite.IgniteSystemProperties.*;
+ 
+ /**
+  *
+  */
+ @SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions"})
+ public class IpcSharedMemorySpace implements Closeable {
+     /** Debug flag (enable for testing). */
 -    private static final boolean DEBUG = Boolean.getBoolean(GG_IPC_SHMEM_SPACE_DEBUG);
++    private static final boolean DEBUG = Boolean.getBoolean(IGNITE_IPC_SHMEM_SPACE_DEBUG);
+ 
+     /** Shared memory segment size (operable). */
+     private final int opSize;
+ 
+     /** Shared memory native pointer. */
+     private final long shmemPtr;
+ 
+     /** Shared memory ID. */
+     private final int shmemId;
+ 
+     /** Semaphore set ID. */
+     private final int semId;
+ 
+     /** Local closed flag. */
+     private final AtomicBoolean closed = new AtomicBoolean();
+ 
+     /** {@code True} if space has been closed. */
+     private final boolean isReader;
+ 
+     /** Lock to protect readers and writers from concurrent close. */
+     private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ 
+     /** */
+     private final int writerPid;
+ 
+     /** */
+     private final int readerPid;
+ 
+     /** */
+     private final String tokFileName;
+ 
+     /** */
+     private final IgniteLogger log;
+ 
+     /**
+      * This will allocate system resources for the space.
+      *
+      * @param tokFileName Token filename.
+      * @param writerPid Writer PID.
+      * @param readerPid Reader PID.
+      * @param size Size in bytes.
+      * @param reader {@code True} if reader.
+      * @param parent Parent logger.
+      * @throws IgniteCheckedException If failed.
+      */
+     public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size,
boolean reader,
+                                 IgniteLogger parent) throws IgniteCheckedException {
+         assert size > 0 : "Size cannot be less than 1 byte";
+ 
+         log = parent.getLogger(IpcSharedMemorySpace.class);
+ 
+         opSize = size;
+ 
+         shmemPtr = IpcSharedMemoryUtils.allocateSystemResources(tokFileName, size, DEBUG
&& log.isDebugEnabled());
+ 
+         shmemId = IpcSharedMemoryUtils.sharedMemoryId(shmemPtr);
+         semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr);
+ 
+         isReader = reader;
+ 
+         this.tokFileName = tokFileName;
+         this.readerPid = readerPid;
+         this.writerPid = writerPid;
+ 
+         if (DEBUG && log.isDebugEnabled())
+             log.debug("Shared memory space has been created: " + this);
+     }
+ 
+     /**
+      * This should be called in order to attach to already allocated system resources.
+      *
+      * @param tokFileName Token file name (for proper cleanup).
+      * @param writerPid Writer PID.
+      * @param readerPid Reader PID.
+      * @param size Size.
+      * @param reader Reader flag.
+      * @param shmemId Shared memory ID.
+      * @param parent Logger.
+      * @throws IgniteCheckedException If failed.
+      */
+     public IpcSharedMemorySpace(String tokFileName, int writerPid, int readerPid, int size,
boolean reader,
+                                 int shmemId, IgniteLogger parent) throws IgniteCheckedException
{
+         assert size > 0 : "Size cannot be less than 1 byte";
+ 
+         log = parent.getLogger(IpcSharedMemorySpace.class);
+ 
+         opSize = size;
+         isReader = reader;
+         this.shmemId = shmemId;
+         this.writerPid = writerPid;
+         this.readerPid = readerPid;
+         this.tokFileName = tokFileName;
+ 
+         shmemPtr = IpcSharedMemoryUtils.attach(shmemId, DEBUG && log.isDebugEnabled());
+ 
+         semId = IpcSharedMemoryUtils.semaphoreId(shmemPtr);
+     }
+ 
+     /**
+      * @param buf Buffer.
+      * @param off Offset.
+      * @param len Length.
+      * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever).
+      * @throws IgniteCheckedException If space has been closed.
+      * @throws IpcSharedMemoryOperationTimedoutException If operation times out.
+      */
+     public void write(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException,
+             IpcSharedMemoryOperationTimedoutException {
+         assert buf != null;
+         assert len > 0;
+         assert buf.length >= off + len;
+         assert timeout >= 0;
+ 
+         assert !isReader;
+ 
+         lock.readLock().lock();
+ 
+         try {
+             if (closed.get())
+                 throw new IgniteCheckedException("Shared memory segment has been closed:
" + this);
+ 
+             IpcSharedMemoryUtils.writeSharedMemory(shmemPtr, buf, off, len, timeout);
+         }
+         finally {
+             lock.readLock().unlock();
+         }
+     }
+ 
+     /**
+      * @param buf Buffer.
+      * @param off Offset.
+      * @param len Length.
+      * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever).
+      * @throws IgniteCheckedException If space has been closed.
+      * @throws IpcSharedMemoryOperationTimedoutException If operation times out.
+      */
+     public void write(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException,
+             IpcSharedMemoryOperationTimedoutException {
+         assert buf != null;
+         assert len > 0;
+         assert buf.limit() >= off + len;
+         assert timeout >= 0;
+         assert !isReader;
+ 
+         lock.readLock().lock();
+ 
+         try {
+             if (closed.get())
+                 throw new IgniteCheckedException("Shared memory segment has been closed:
" + this);
+ 
+             IpcSharedMemoryUtils.writeSharedMemoryByteBuffer(shmemPtr, buf, off, len, timeout);
+         }
+         finally {
+             lock.readLock().unlock();
+         }
+     }
+ 
+     /**
+      * Blocks until at least 1 byte is read.
+      *
+      * @param buf Buffer.
+      * @param off Offset.
+      * @param len Length.
+      * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever).
+      * @return Read bytes count.
+      * @throws IgniteCheckedException If space has been closed.
+      * @throws IpcSharedMemoryOperationTimedoutException If operation times out.
+      */
+     public int read(byte[] buf, int off, int len, long timeout) throws IgniteCheckedException,
+             IpcSharedMemoryOperationTimedoutException {
+         assert buf != null;
+         assert len > 0;
+         assert buf.length >= off + len;
+ 
+         assert isReader;
+ 
+         lock.readLock().lock();
+ 
+         try {
+             if (closed.get())
+                 throw new IgniteCheckedException("Shared memory segment has been closed:
" + this);
+ 
+             return (int) IpcSharedMemoryUtils.readSharedMemory(shmemPtr, buf, off, len,
timeout);
+         }
+         finally {
+             lock.readLock().unlock();
+         }
+     }
+ 
+     /**
+      * Blocks until at least 1 byte is read.
+      *
+      * @param buf Buffer.
+      * @param off Offset.
+      * @param len Length.
+      * @param timeout Operation timeout in milliseconds ({@code 0} to wait forever).
+      * @return Read bytes count.
+      * @throws IgniteCheckedException If space has been closed.
+      * @throws IpcSharedMemoryOperationTimedoutException If operation times out.
+      */
+     public int read(ByteBuffer buf, int off, int len, long timeout) throws IgniteCheckedException,
+             IpcSharedMemoryOperationTimedoutException {
+         assert buf != null;
+         assert len > 0;
+         assert buf.capacity() >= off + len;
+         assert isReader;
+ 
+         lock.readLock().lock();
+ 
+         try {
+             if (closed.get())
+                 throw new IgniteCheckedException("Shared memory segment has been closed:
" + this);
+ 
+             return (int) IpcSharedMemoryUtils.readSharedMemoryByteBuffer(shmemPtr, buf,
off, len, timeout);
+         }
+         finally {
+             lock.readLock().unlock();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void close() {
+         close0(false);
+     }
+ 
+     /**
+      * Forcibly closes the space and frees all system resources.
+      * <p>
+      * This method should be called with caution as it may result to the other-party
+      * process crash. It is intended to call when there was an IO error during handshake
+      * and other party has not yet attached to the space.
+      */
+     public void forceClose() {
+         close0(true);
+     }
+ 
+     /**
+      * @return Shared memory ID.
+      */
+     public int sharedMemoryId() {
+         return shmemId;
+     }
+ 
+     /**
+      * @return Semaphore set ID.
+      */
+     public int semaphoreId() {
+         return semId;
+     }
+ 
+     /**
+      * @param force {@code True} to close the space.
+      */
+     private void close0(boolean force) {
+         if (!closed.compareAndSet(false, true))
+             return;
+ 
+         IpcSharedMemoryUtils.ipcClose(shmemPtr);
+ 
+         // Wait all readers and writes to leave critical section.
+         lock.writeLock().lock();
+ 
+         try {
+             IpcSharedMemoryUtils.freeSystemResources(tokFileName, shmemPtr, force);
+         }
+         finally {
+             lock.writeLock().unlock();
+         }
+ 
+         if (DEBUG && log.isDebugEnabled())
+             log.debug("Shared memory space has been closed: " + this);
+     }
+ 
+     /**
+      * @return Bytes available for read.
+      * @throws IgniteCheckedException If failed.
+      */
+     public int unreadCount() throws IgniteCheckedException {
+         lock.readLock().lock();
+ 
+         try {
+             if (closed.get())
+                 throw new IgniteCheckedException("Shared memory segment has been closed:
" + this);
+ 
+             return IpcSharedMemoryUtils.unreadCount(shmemPtr);
+         }
+         finally {
+             lock.readLock().unlock();
+         }
+     }
+ 
+     /**
+      * @return Shared memory pointer.
+      */
+     public long sharedMemPointer() {
+         return shmemPtr;
+     }
+ 
+     /**
+      * @return Reader PID.
+      */
+     public int readerPid() {
+         return readerPid;
+     }
+ 
+     /**
+      * @return Writer PID.
+      */
+     public int writerPid() {
+         return writerPid;
+     }
+ 
+     /**
+      * @return Vis-a-vis PID.
+      */
+     public int otherPartyPid() {
+         return isReader ? writerPid : readerPid;
+     }
+ 
+     /**
+      * @return Token file name used to create shared memory space.
+      */
+     public String tokenFileName() {
+         return tokFileName;
+     }
+ 
+     /**
+      * @return Space size.
+      */
+     public int size() {
+         return opSize;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(IpcSharedMemorySpace.class, this, "closed", closed.get());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/IgniteOptimizedMarshaller.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithSpecifiedWorkDirectorySelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java
index 095bd2d,a56f8b2..8fae702
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/GridStartupWithUndefinedGridGainHomeSelfTest.java
@@@ -29,10 -29,10 +29,10 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.testframework.junits.common.*;
  
  import static org.apache.ignite.IgniteSystemProperties.*;
- import static org.apache.ignite.internal.util.GridUtils.*;
+ import static org.apache.ignite.internal.util.IgniteUtils.*;
  
  /**
 - * Checks that node can be started without operations with undefined GRIDGAIN_HOME.
 + * Checks that node can be started without operations with undefined IGNITE_HOME.
   * <p>
   * Notes:
   * 1. The test intentionally extends JUnit {@link TestCase} class to make the test

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/deploy/VisorDeployCommand.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/start/VisorStartCommand.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac05bb9c/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
----------------------------------------------------------------------


Mime
View raw message