ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] incubator-ignite git commit: # ignite-117 : rename all 'ggshem' on 'igniteshmem'
Date Wed, 28 Jan 2015 16:34:09 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-117 50f31e6e4 -> ae3ba8c9d


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp
----------------------------------------------------------------------
diff --git a/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp
b/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp
new file mode 100644
index 0000000..eaa13fd
--- /dev/null
+++ b/ipc/shmem/igniteshmem/org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.cpp
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils.h"
+
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <cerrno>
+#include <sys/types.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
+#include <sys/sem.h>
+#include <sys/stat.h>
+#include <string>
+#include <iostream>
+#include <time.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <signal.h>
+
+using namespace std;
+
+/** IgniteCheckedException JNI class name. */
+const char* GRID_EXCEPTION = "org/apache/ignite/IgniteCheckedException";
+
+/** GridIpcSharedMemoryOperationTimedoutException JNI class name. */
+const char* OP_TIMEDOUT_EXCEPTION = "org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryOperationTimedoutException";
+
+/** GridIpcOutOfSystemResourcesException JNI class name. */
+const char* OUT_OF_RSRCS_EXCEPTION = "org/apache/ignite/internal/util/ipc/shmem/GridIpcOutOfSystemResourcesException";
+
+/** Global flag for enabling debug logging. */
+static bool __GG_DEBUG = false;
+
+/** Read semaphore ID. */
+#define SEM_READ 0
+
+/** Write semaphore ID. */
+#define SEM_WRITE 1
+
+/**
+ * Logging macro.
+ *
+ * @param m Logging message with optional formatting symbols.
+ * @param varagrs Formatting arguments.
+ */
+#define GG_LOG_DEBUG(m, ...) {\
+    if(__GG_DEBUG)\
+        log(__FILE__, __LINE__, __FUNCTION__, m, __VA_ARGS__);\
+}
+
+/** Buffer size for current time string. */
+#define TIME_NOW_BUF_SIZE 1024
+
+/** Buffer size for debug message. */
+#define FORMAT_LOG_BUF_SIZE 4096
+
+/**
+ * @return Current time string in format: year-month-day hour:minute:second.
+ */
+static string timeNow() {
+    timeval tv;
+    tm lt;
+
+    char timebuf[TIME_NOW_BUF_SIZE];
+
+    gettimeofday(&tv, 0);
+
+    time_t now = tv.tv_sec;
+    localtime_r(&now, &lt);
+
+    // Clone the format used by log4j ISO8601DateFormat,
+    // specifically: "yyyy-MM-dd HH:mm:ss.SSS"
+    size_t len = strftime(timebuf, TIME_NOW_BUF_SIZE, "%Y-%m-%d %H:%M:%S", &lt);
+
+    snprintf(timebuf + len, TIME_NOW_BUF_SIZE - len, ".%03d", (int) (tv.tv_usec / 1000));
+
+    return string(timebuf);
+}
+
+/**
+ * Writes debug message to standard output, if global debug flag is enabled.
+ *
+ * @param file Source file, from which the message originates.
+ * @param line Source line, from which the message originates.
+ * @param funcName Name of the function, from which the message originates.
+ * @param format Message string with optional formatting symbols.
+ * @param varargs Formatting arguments.
+ */
+static void log(const char* file, int line, const char* funcName, const char* format, ...)
{
+    static pid_t pid = getpid();
+
+    char msgbuf[FORMAT_LOG_BUF_SIZE];
+    va_list va;
+
+    va_start(va, format);
+    vsnprintf(msgbuf, FORMAT_LOG_BUF_SIZE - 1, format, va);
+    va_end(va);
+
+    cout << timeNow() << " pid:" << pid << " " << file <<
":" << funcName << ":" << line << ": " << msgbuf << endl;
+
+    flush(cout);
+}
+
+/** Lock operation on semaphore #0. */
+struct sembuf op_lock0[2] = {
+    0, 0, 0, // Wait until semaphore #0 becomes 0.
+    0, 1, 0  // Then increment semaphore #0 by 1.
+};
+
+/** Lock operation on semaphore #1. */
+struct sembuf op_lock1[2] = {
+    1, 0, 0, // Wait until semaphore #1 becomes 0.
+    1, 1, 0  // Then increment semaphore #1 by 1.
+};
+
+/**
+ * Data offset in shared memory buffer (the memory segment preceding data
+ * is used for IPC data).
+ */
+#define BUF_OFFSET 64
+
+/**
+ * IPC data, that is used for inter-process communication.
+ */
+typedef struct {
+    /** Number of parties that have closed the connection (0, 1, or 2). */
+    int closedCnt;
+
+    /** Shared memory segment ID. */
+    int shmId;
+
+    /** Semaphore set ID. */
+    int semId;
+
+    /** Shared memory segment size. */
+    int size;
+
+    /** Closed flag. */
+    volatile bool closed;
+
+    /** Read position. */
+    volatile unsigned int readCnt;
+
+    /** Write position (should be always >= readCnt). */
+    volatile unsigned int writeCnt;
+
+    /** Flag, indicating that reader is waiting on semaphore. */
+    volatile bool readBlocked;
+
+    /** Flag, indicating that writer is waiting on semaphore. */
+    volatile bool writeBlocked;
+} T_IpcData;
+
+/**
+ * Calculates unread bytes count, given IPC data pointer.
+ *
+ * @param ipcData IPC data pointer.
+ * @param fetchWriteCnt True to fetch write count or read it normally.
+ * @param fetchReadCnt True to fetch read count or read it normally..
+ * @return Unread bytes count.
+ */
+static unsigned int getUnreadCount(T_IpcData *ipcData, bool fetchWriteCnt, bool fetchReadCnt)
{
+    unsigned int writeCnt = fetchWriteCnt ? __sync_fetch_and_add(&ipcData->writeCnt,
0) : ipcData->writeCnt;
+    unsigned int readCnt = fetchReadCnt ? __sync_fetch_and_add(&ipcData->readCnt,
0) : ipcData->readCnt;
+
+    unsigned int unreadCnt = writeCnt - readCnt;
+
+    if (unreadCnt < 0) {
+        GG_LOG_DEBUG("Unread count failed [writeCnt=%u, readCnt=%u]", writeCnt, readCnt);
+
+        *((char *) 0) = 5;
+    }
+
+    return unreadCnt;
+}
+
+/**
+ * Throws exception in Java code.
+ *
+ * @param env JNI environment.
+ * @param clsName Exception class full name (slashed notation).
+ */
+static void throwException(JNIEnv* env, const char* clsName) {
+    // We assume that 512 bytes will be enough.
+    char msg[512];
+
+    ::sprintf(msg, "%s (error code: %d).", ::strerror(errno), errno);
+
+    env->ThrowNew(env->FindClass(clsName), msg);
+}
+
+/**
+ * Throws exception in Java code according to current
+ * errno value.
+ *
+ * @param env JNI environment.
+ */
+static void throwExceptionByErrno(JNIEnv* env) {
+    switch (errno) {
+    case ENOMEM:
+    case EMFILE:
+    case ENOSPC:
+        throwException(env, OUT_OF_RSRCS_EXCEPTION);
+        break;
+
+    default:
+        throwException(env, GRID_EXCEPTION);
+        break;
+    }
+}
+
+/**
+ * Initializes semaphore.
+ *
+ * @param env JNI environment.
+ * @param semId Semaphore set ID.
+ * @param semNum Semaphore number in semaphore set.
+ */
+static bool semInit(JNIEnv* env, int semId, int semNum) {
+    struct sembuf sb;
+    memset(&sb, 0, sizeof(sb));
+
+    // Initialize the semaphore.
+    sb.sem_op = 1;
+    sb.sem_num = semNum;
+
+    if (::semop(semId, &sb, 1) == -1) {
+        GG_LOG_DEBUG("Semaphore init failed [semId=%d, semNum=%s, errno=%d]", semId,
+                semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno);
+
+        throwException(env, GRID_EXCEPTION);
+
+        return false;
+    }
+
+    return true;
+}
+
+/**
+ * Waits on semaphore until another process has signaled or the semaphore
+ * has been removed.
+ *
+ * @param env JNI environment.
+ * @param semId Semaphore set ID.
+ * @param semNum Semaphore number in semaphore set.
+ * @param timeout Timeout for wait operation, if supported on current platform.
+ * @param ipcData IPC data pointer.
+ * @see semNotify()
+ */
+static void semWait(JNIEnv * env, int semId, int semNum, int timeout, T_IpcData *ipcData)
{
+    while (1) {
+        int ret;
+#ifdef HAVE_SEMTIMEDOP
+        _STRUCT_TIMESPEC timeout0 = {
+            0, timeout * 1000
+        };
+        ret = semtimedop(semId, semNum == 0 ? op_lock0 : op_lock1, 2, timeout > 0 ? &timeout0
: NULL);
+#else
+        ret = semop(semId, semNum == 0 ? op_lock0 : op_lock1, 2);
+#endif
+        if (ret == 0)
+            return;
+
+        if (errno == EIDRM || errno == EINVAL) { // Semaphore was removed while waiting.
+            if (!ipcData->closed) {
+                GG_LOG_DEBUG("Semaphore removed, but the space is not closed [semId=%d]",
semId);
+
+                ipcData->closed = true;
+            }
+
+            return;
+        }
+
+        GG_LOG_DEBUG("Semaphore wait failed [semId=%d, semNum=%s, errno=%d]", semId,
+                semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno);
+
+        if (errno == EINTR) {
+            // spin again
+        }
+        else if (errno == EAGAIN) {
+            throwException(env, OP_TIMEDOUT_EXCEPTION);
+
+            return;
+        }
+        else {
+            throwException(env, GRID_EXCEPTION);
+        }
+    }
+}
+
+/**
+ * Notifies the semaphore to signal other process waiting on this semaphore
+ * to resume execution.
+ *
+ * @param env JNI environment.
+ * @param semId Semaphore set ID.
+ * @param semNum Semaphore number in semaphore set.
+ * @param ipcData IPC data pointer.
+ * @see semWait()
+ */
+static void semNotify(JNIEnv * env, int semId, int semNum, T_IpcData *ipcData) {
+    if (::semctl(semId, semNum, SETVAL, 0) == -1) {
+        if (errno == EIDRM || errno == EINVAL) {
+            if (!ipcData->closed) {
+                GG_LOG_DEBUG("Semaphore removed, but the space is not closed [semId=%d]",
semId);
+
+                ipcData->closed = true;
+            }
+
+            return;
+        }
+
+        GG_LOG_DEBUG("Semaphore wait failed [semId=%d, semNum=%s, errno=%d]", semId,
+                semNum == SEM_READ ? "SEM_READ" : "SEM_WRITE", errno);
+
+        throwException(env, GRID_EXCEPTION);
+    }
+}
+
+/**
+ * Allocates shared memory segment and semaphores for inter-process communication (JNI method).
+ *
+ * @param env JNI environment.
+ * @param jTokFileName Token file name for allocating resources.
+ * @param size Shared memory segment size in bytes.
+ * @param debug Debug flag, which modifies the global debug flag. This parameter is expected
+ *              to be always the same during application lifetime.
+ */
+jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_allocateSystemResources(
+        JNIEnv * env, jclass, jstring jTokFileName, jint size, jboolean debug) {
+    int key = -1;
+    int shmId = -1;
+    int semId = -1;
+    jboolean isCopy = false;
+
+    // Copy to STL string and release.
+    const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy);
+
+    string tokFileName(tokFileName0);
+
+    env->ReleaseStringUTFChars(jTokFileName, tokFileName0);
+
+    // Set global debug flag.
+    __GG_DEBUG = debug;
+
+    // Get system token, used in IPC.
+    if ((key = ::ftok(tokFileName.c_str(), 'G')) == -1) {
+        throwException(env, GRID_EXCEPTION);
+
+        return 0;
+    }
+
+    // Get shared memory descriptor (create shared memory segment if absent).
+    if ((shmId = ::shmget(key, size + BUF_OFFSET, 0666 | IPC_CREAT)) == -1) {
+        throwExceptionByErrno(env);
+
+        return 0;
+    }
+
+    void* data = ::shmat(shmId, (void *) 0, 0);
+
+#ifndef __APPLE__
+    // Shared memory segment will be deleted upon last process detach.
+    ::shmctl(shmId, IPC_RMID, NULL);
+#else
+    GG_LOG_DEBUG("Will not mark shared memory region for deletion (will be removed on close):
%d", shmId);
+#endif
+
+    if ((ptrdiff_t) data == -1) {
+        // Exception will be thrown on return.
+        throwExceptionByErrno(env);
+
+        return 0;
+    }
+
+    T_IpcData *ipcData = (T_IpcData*) data;
+
+    // Allocate semaphores for native synchronization.
+    if ((semId = ::semget(key, 2, 0666 | IPC_CREAT)) == -1) {
+        // Exception will be thrown on return.
+        throwExceptionByErrno(env);
+
+        // Cleanup ignoring possible errors.
+        ::shmdt(ipcData);
+
+        return 0;
+    }
+
+    // Initialize SEM_READ and SEM_WRITE to 1.
+    if (!semInit(env, semId, SEM_READ) || !semInit(env, semId, SEM_WRITE)) {
+        // Exception will be thrown on return.
+        throwException(env, GRID_EXCEPTION);
+
+        // Cleanup ignoring possible errors.
+        ::semctl(semId, 0, IPC_RMID);
+        ::shmdt(ipcData);
+
+        return 0;
+    }
+
+    // Initialize data structure.
+    memset(ipcData, 0, sizeof(*ipcData));
+    ipcData->shmId = shmId;
+    ipcData->semId = semId;
+    ipcData->size = size;
+
+    return (jlong) (((char*) data) + BUF_OFFSET);
+}
+
+/**
+ * Attaches to an existing shared memory segment (JNI method).
+ *
+ * @param env JNI environment.
+ * @param shmId Shared memory segment ID.
+ * @param debug Debug flag, which modifies the global debug flag. This parameter is expected
+ *              to be always the same during application lifetime.
+ */
+jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_attach(JNIEnv *
env, jclass,
+        jint shmId, jboolean debug) {
+    // Set global debug flag.
+    __GG_DEBUG = debug;
+
+    void* data = ::shmat(shmId, (void *) 0, 0);
+
+    GG_LOG_DEBUG("Attaching to shmId: %d", shmId);
+
+    if ((ptrdiff_t) data == -1) {
+        // Exception will be thrown on return.
+        throwExceptionByErrno(env);
+
+        return 0;
+    }
+
+    T_IpcData *ipcData = (T_IpcData*) data;
+
+    return (jlong) (((char*) data) + BUF_OFFSET);
+}
+
+/**
+ * Shuts down inter-process communication (JNI method).
+ *
+ * @param env JNI environment.
+ * @param buf Data buffer pointer in shared memory segment.
+ */
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ipcClose(JNIEnv
*env, jclass, jlong buf) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) buf) - BUF_OFFSET);
+
+    // Set closed flag to true using memory barrier.
+    // This is to ensure the flag is set BEFORE we notify
+    // the semaphores (i.e. no reordering will happen).
+    __sync_fetch_and_add(&ipcData->closed, 1);
+
+    // Remove semaphore.
+    if (::semctl(ipcData->semId, 0, IPC_RMID) == -1) {
+        if (__GG_DEBUG)
+            cerr << "Failed to remove semaphore: " << errno << ": " <<
strerror(errno) << endl << flush;
+
+        if (errno == EPERM) { // Operation not permitted (no rights).
+            // Signal both reader and writer (because we don't know who we are).
+            // The other side will remove the semaphore.
+            semNotify(env, ipcData->semId, SEM_READ, ipcData);
+            semNotify(env, ipcData->semId, SEM_WRITE, ipcData);
+        }
+    }
+}
+
+/**
+ * Detaches from shared memory segment and removes the token file.
+ *
+ * @param env JNI environment.
+ * @param jTokFileName Token file name for allocating resources.
+ * @param buf Data buffer pointer in shared memory segment.
+ * @param force Force flag for forcing resources removal.
+ *
+ */
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_freeSystemResources__Ljava_lang_String_2JZ(
+        JNIEnv* env, jclass, jstring jTokFileName, jlong buf, jboolean force) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) buf) - BUF_OFFSET);
+
+    if (__sync_bool_compare_and_swap(&ipcData->closedCnt, 0, 1) && !force)
{
+        ::shmdt(ipcData);
+
+        return;
+    }
+
+#ifdef __APPLE__
+    int shmId = ipcData->shmId;
+#endif
+
+    // Detach from shared memory (shared memory segment will be deleted upon last detach).
+    if (::shmdt(ipcData) == -1) {
+        // If error occurred, then return.
+        return;
+    }
+
+#ifdef __APPLE__
+    GG_LOG_DEBUG("Deleting shared memory region: %d", shmId);
+
+    ::shmctl(shmId, IPC_RMID, NULL);
+#endif
+
+    jboolean isCopy = false;
+
+    // Copy to STL string and release.
+    const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy);
+
+    string tokFileName(tokFileName0);
+
+    env->ReleaseStringUTFChars(jTokFileName, tokFileName0);
+
+    ::remove(tokFileName.c_str());
+}
+
+/**
+ * Removes semaphores and shared memory segment.
+ *
+ * @param env JNI environment.
+ * @param jTokFileName Token file name for allocating resources.
+ * @param size Shared memory segment size in bytes.
+ */
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_freeSystemResources__Ljava_lang_String_2I(
+        JNIEnv* env, jclass, jstring jTokFileName, jint size) {
+    int key = -1;
+    int shmId = -1;
+    int semId = -1;
+    jboolean isCopy = false;
+
+    // Copy to STL string and release.
+    const char* tokFileName0 = env->GetStringUTFChars(jTokFileName, &isCopy);
+
+    string tokFileName(tokFileName0);
+
+    env->ReleaseStringUTFChars(jTokFileName, tokFileName0);
+
+    // Get system token, used in IPC.
+    if ((key = ::ftok(tokFileName.c_str(), 'G')) == -1) {
+        return;
+    }
+
+    // Get semaphores for native synchronization (no create).
+    if ((semId = ::semget(key, 2, 0666)) > 0) {
+        // Remove semaphores if present.
+        ::semctl((int) semId, 0, IPC_RMID);
+    }
+
+    // Get shared memory descriptor (no create).
+    if ((shmId = ::shmget(key, size, 0666)) == -1) {
+        // This means that shared memory segment was not created or was removed.
+        // No point to continue, as semaphores do not exist as well.
+        return;
+    }
+
+    // Remove shared memory segment (ignoring possible errors).
+    ::shmctl((int) shmId, IPC_RMID, NULL);
+}
+
+/**
+ * Read-write operations for shared memory segments.
+ */
+class RW {
+public:
+    /**
+     * Copies data from shared memory to the destination buffer.
+     *
+     * @param env JNI environment.
+     * @param dest Destination buffer.
+     * @param dOffset Destination buffer offset.
+     * @param len Number of bytes to copy.
+     * @param src Source pointer in shared memory segment.
+     */
+    static void FromShMem(JNIEnv *env, jbyteArray dest, jlong dOffset, jlong len, void *src)
{
+        env->SetByteArrayRegion(dest, dOffset, len, (jbyte*) src);
+    }
+
+    /**
+     * Copies data from shared memory to the destination Java object.
+     *
+     * @param env JNI environment.
+     * @param dest Destination Java object.
+     * @param dOffset Destination object offset.
+     * @param len Number of bytes to copy.
+     * @param src Source pointer in shared memory segment.
+     */
+    static void FromShMem(JNIEnv *env, jobject dest, jlong dOffset, jlong len, void *src)
{
+        char *destAddr = ((char *) env->GetDirectBufferAddress(dest)) + dOffset;
+        memcpy((void*) destAddr, src, len);
+    }
+
+    /**
+     * Copies data from buffer to shared memory.
+     *
+     * @param env JNI environment.
+     * @param src Source buffer.
+     * @param sOffset Source buffer offset.
+     * @param len Number of bytes to copy.
+     * @param dest Destination pointer.
+     */
+    static void ToShMem(JNIEnv *env, jbyteArray src, jlong sOffset, jlong len, void *dest)
{
+        env->GetByteArrayRegion(src, sOffset, len, (jbyte*) dest);
+    }
+
+    /**
+     * Copies data from Java object to shared memory.
+     *
+     * @param env JNI environment.
+     * @param src Source Java object.
+     * @param sOffset Source Java object offset.
+     * @param len Number of bytes to copy.
+     * @param dest Destination pointer.
+     */
+    static void ToShMem(JNIEnv *env, jobject src, jlong sOffset, jlong len, void *dest) {
+        char *srcAddr = ((char *) env->GetDirectBufferAddress(src)) + sOffset;
+        memcpy(dest, (void*) srcAddr, len);
+    }
+};
+
+/**
+ * Helper method for copying data from shared memory.
+ *
+ * @param env JNI environment.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param dest Destination object to copy data to.
+ * @param dOffset Destination object write offset.
+ * @param len Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ * @param <T> Destination object type.
+ */
+template<class T>
+jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem(JNIEnv
*env, jclass, jlong shMemPtr,
+        T dest, jlong dOffset, jlong len, jlong timeout) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
+
+    unsigned int unreadCnt = getUnreadCount(ipcData, true, false);
+
+    while (unreadCnt == 0) {
+        if (unreadCnt == 0 && ipcData->closed) {
+            return -1;
+        }
+
+        // signal the other party, if it's blocked
+        if (ipcData->writeBlocked) {
+            GG_LOG_DEBUG("Before write semaphore notification [semId=%d]", ipcData->semId);
+
+            semNotify(env, ipcData->semId, SEM_WRITE, ipcData);
+        }
+
+        GG_LOG_DEBUG("Before read semaphore wait [semId=%d]", ipcData->semId);
+
+        ipcData->readBlocked = 1;
+        semWait(env, ipcData->semId, SEM_READ, timeout, ipcData);
+        ipcData->readBlocked = 0;
+
+        unreadCnt = getUnreadCount(ipcData, true, false);
+    }
+
+    int bytesRead = 0;
+
+    while (unreadCnt > 0 && bytesRead < len) {
+        int pos = ipcData->readCnt % ipcData->size;
+        int len0 = (ipcData->size - pos < unreadCnt) ? ipcData->size - pos : unreadCnt;
+
+        if (len0 > len - bytesRead) {
+            len0 = len - bytesRead;
+        }
+
+        RW::FromShMem(env, dest, dOffset + bytesRead, len0, (void*) (shMemPtr + pos));
+
+        __sync_add_and_fetch(&ipcData->readCnt, len0);
+
+        GG_LOG_DEBUG("Updated read count [readCnt=%d]", ipcData->readCnt);
+
+        bytesRead += len0;
+
+        GG_LOG_DEBUG("Before write semaphore notification [semId=%d]", ipcData->semId);
+
+        semNotify(env, ipcData->semId, SEM_WRITE, ipcData);
+
+        unreadCnt = getUnreadCount(ipcData, true, false);
+    }
+
+    return bytesRead;
+}
+
+/**
+ * Helper method for copying data to shared memory.
+ *
+ * @param env JNI environment.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param src Source object to copy data from.
+ * @param dOffset Destination object read offset.
+ * @param len Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ * @param <T> Source object type.
+ */
+template<class T>
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem(JNIEnv
*env, jclass,
+        jlong shMemPtr, T src, jlong sOffset, jlong len, jlong timeout) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
+
+    int bytesWritten = 0;
+
+    while (bytesWritten < len) {
+        // Wait for reader.
+        unsigned int unreadCnt = getUnreadCount(ipcData, false, true);
+        int pos = ipcData->writeCnt % ipcData->size;
+
+        while (unreadCnt == ipcData->size) {
+            if (ipcData->closed) {
+                env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment
has been closed.");
+
+                return;
+            }
+
+            // signal the other party, if it's blocked
+            if (ipcData->readBlocked) {
+                semNotify(env, ipcData->semId, SEM_READ, ipcData);
+            }
+
+            ipcData->writeBlocked = 1;
+            semWait(env, ipcData->semId, SEM_WRITE, timeout, ipcData);
+            ipcData->writeBlocked = 0;
+
+            unreadCnt = getUnreadCount(ipcData, false, true);
+        }
+
+        int len0 = ipcData->size - ((pos > unreadCnt) ? pos : unreadCnt);
+
+        if (len0 > len - bytesWritten) {
+            len0 = len - bytesWritten;
+        }
+
+        if (ipcData->closed) {
+            env->ThrowNew(env->FindClass(GRID_EXCEPTION), "Shared memory segment has
been closed");
+
+            return;
+        }
+
+        RW::ToShMem(env, src, sOffset + bytesWritten, len0, (void*) (shMemPtr + pos));
+
+        __sync_add_and_fetch(&ipcData->writeCnt, len0);
+
+        GG_LOG_DEBUG("Updated write count [readCnt=%d]", ipcData->readCnt);
+
+        bytesWritten += len0;
+
+        semNotify(env, ipcData->semId, SEM_READ, ipcData);
+    }
+}
+
+/**
+ * Copies data from Java byte array to shared memory (JNI method).
+ *
+ * @param env JNI environment.
+ * @param clsName Java class name.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param src Source Java byte array.
+ * @param dOffset Source Java byte array offset.
+ * @param len Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ */
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_writeSharedMemory(JNIEnv
*env, jclass clsName,
+        jlong shMemPtr, jbyteArray src, jlong sOffset, jlong len, jlong timeout) {
+    Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem<jbyteArray>(env,
clsName, shMemPtr,
+            src, sOffset, len, timeout);
+}
+
+/**
+ * Copies data from Java byte array to shared memory (JNI method).
+ *
+ * @param env JNI environment.
+ * @param clsName Java class name.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param src Source Java object.
+ * @param dOffset Source Java object offset.
+ * @param len Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ */
+void Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_writeSharedMemoryByteBuffer(JNIEnv
*env,
+        jclass clsName, jlong shMemPtr, jobject src, jlong sOffset, jlong len, jlong timeout)
{
+    Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_WriteShMem<jobject>(env,
clsName, shMemPtr,
+            src, sOffset, len, timeout);
+}
+
+/**
+ * Copies data from shared memory to Java byte array (JNI method).
+ *
+ * @param env JNI environment.
+ * @param clsName Java class name.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param dest Destination Java byte array.
+ * @param dOffset Destination Java byte array offset.
+ * @param size Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ */
+jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_readSharedMemory(JNIEnv
*env, jclass clsName,
+        jlong shMemPtr, jbyteArray dest, jlong dOffset, jlong size, jlong timeout) {
+    return Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem<jbyteArray>(env,
clsName, shMemPtr,
+            dest, dOffset, size, timeout);
+}
+
+/**
+ * Copies data from shared memory to Java object (JNI method).
+ *
+ * @param env JNI environment.
+ * @param clsName Java class name.
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @param dest Destination Java object.
+ * @param dOffset Destination Java object offset.
+ * @param size Number of bytes to copy.
+ * @param timeout Operation timeout in milliseconds.
+ */
+jlong Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_readSharedMemoryByteBuffer(JNIEnv
*env,
+        jclass clsName, jlong shMemPtr, jobject dest, jlong dOffset, jlong size, jlong timeout)
{
+    return Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_ReadShMem<jobject>(env,
clsName, shMemPtr,
+            dest, dOffset, size, timeout);
+}
+
+/**
+ * Gets the number of unread bytes in shared memory segment (JNI method).
+ *
+ * @param shMemPtr Data pointer in shared memory segment.
+ * @return Number of uneread bytes.
+ */
+jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_unreadCount(JNIEnv*,
jclass, jlong shMemPtr) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
+
+    return getUnreadCount(ipcData, true, true);
+}
+
+/**
+ * Checks if the counterpart is alive (JNI method).
+ *
+ * @param pid Process ID of the counterpart.
+ * @return true if couterpart is alive, false otherwise.
+ */
+jboolean Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_alive(JNIEnv*,
jclass, jint pid) {
+    int res = kill((int) pid, 0);
+
+    // Return true if signal was sent or there is no permission to send signal to this process.
+    // If kill failed with error and errno is ESRCH or EINVAL, process is considered to be
dead.
+    return res == 0 || errno == EPERM;
+}
+
+/**
+ * Gets the shared memory segment ID for a given shared memory data pointer (JNI method).
+ *
+ * @param shMemPtr Shared memory data pointer.
+ * @return Shared memory ID.
+ */
+jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_sharedMemoryId(JNIEnv*,
jclass, jlong shMemPtr) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
+
+    return ipcData->shmId;
+}
+
+/**
+ * Gets the semaphore set ID for a given shared memory data pointer (JNI method).
+ *
+ * @param shMemPtr Shared memory data pointer.
+ * @return Semaphore set ID.
+ */
+jint Java_org_apache_ignite_internal_util_ipc_shmem_IpcSharedMemoryUtils_semaphoreId(JNIEnv*,
jclass, jlong shMemPtr) {
+    T_IpcData *ipcData = (T_IpcData*) (((char *) shMemPtr) - BUF_OFFSET);
+
+    return ipcData->semId;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/ipc/shmem/readme.txt
----------------------------------------------------------------------
diff --git a/ipc/shmem/readme.txt b/ipc/shmem/readme.txt
index ebc64c8..59e3b4c 100644
--- a/ipc/shmem/readme.txt
+++ b/ipc/shmem/readme.txt
@@ -26,7 +26,7 @@ Usage with GridGain
 -------------------
 
 Copy compiled library to folder that already listed in 'java.library.path'
-with name in form: 'libggshmem-<gridgain-version>.<extention>'.
+with name in form: 'libigniteshmem-<gridgain-version>.<extention>'.
 Note: Grid should be restarted.
 
 **************************************************************************************

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so b/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so
index d39a83e..7dd28f0 100755
Binary files a/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so and b/modules/core/src/main/java/META-INF/native/linux64/libggshmem.so
differ

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
index ef310ec..4ad95d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
@@ -36,10 +36,10 @@ public class GridIpcSharedMemoryNativeLoader {
     private static volatile boolean loaded;
 
     /** Library name base. */
-    private static final String LIB_NAME_BASE = "ggshmem";
+    private static final String LIB_NAME_BASE = "igniteshmem";
 
     /** Lock file path. */
-    private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"),
"ggshmem.lock");
+    private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"),
"igniteshmem.lock");
 
     /** Library name. */
     static final String LIB_NAME = LIB_NAME_BASE + "-" + GridProductImpl.VER;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3ba8c9/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java
index 8fb7c6a..ec0566e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtils.java
@@ -185,7 +185,7 @@ public class IpcSharedMemoryUtils {
      * @return Wrapping grid exception.
      */
     static IgniteCheckedException linkError(UnsatisfiedLinkError e) {
-        return new IgniteCheckedException("Linkage error due to possible native library,
libggshmem.so, " +
+        return new IgniteCheckedException("Linkage error due to possible native library,
libigniteshmem.so, " +
             "version mismatch (stop all grid nodes, clean up your '/tmp' folder, and try
again).", e);
     }
 


Mime
View raw message