ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/10] incubator-ignite git commit: # ignite-496
Date Thu, 19 Mar 2015 14:57:18 GMT
# ignite-496


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5fa19eb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5fa19eb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5fa19eb6

Branch: refs/heads/ignite-496
Commit: 5fa19eb6c3227f72eec33c36904ddf1b76f5fca8
Parents: 97f0c03
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 19 17:56:48 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Mar 19 17:56:48 2015 +0300

----------------------------------------------------------------------
 .../processors/interop/InteropCache.java        |  16 +++
 .../processors/interop/InteropTarget.java       |   4 +-
 .../interop/InteropTargetAdapter.java           |  50 ++++++++-
 .../processors/interop/InteropUtils.java        |   4 +-
 .../ignite-interop-api/ignite-interop-api.cpp   |  44 ++++++--
 .../ignite-interop-api/ignite-interop-api.h     |   8 ++
 .../src/ignite_cache.cpp                        | 102 ++++++++++++++++++-
 7 files changed, 213 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java
index 6b2e8e0..d17d7b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.interop;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.interop.*;
 
@@ -67,4 +68,19 @@ public class InteropCache extends InteropTargetAdapter {
 
         marsh.writeObject(val, out);
     }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<Void> inOutAsyncOp(int type,
+        InteropInputStream in,
+        InteropOutputStream out,
+        InteropMarshaller marsh)
+    {
+        Object key = marsh.readObject(in);
+
+        Object val = marsh.readObject(in);
+
+        log.info("Interop put async [key=" + key + ", val=" + val + ']');
+
+        return cache.putxAsync(key, val);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java
index bbb99bc..28a4794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTarget.java
@@ -46,9 +46,9 @@ public interface InteropTarget {
      * @param ptr Input data pointer.
      * @param len Input data length.
      * @param cb Callback address.
-     * @param cbArg Value passed to callback.
+     * @param cbData Value passed to callback.
      * @throws IgniteCheckedException If failed.
      */
-    public void inOutAsyncOp(int type, long ptr, int len, long cb, long cbArg)
+    public void inOutOpAsync(int type, long ptr, int len, long cb, long cbData)
         throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java
index 58f5f3e..7b29301 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropTargetAdapter.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.interop;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.interop.*;
 
 /**
@@ -87,6 +89,52 @@ public abstract class InteropTargetAdapter implements InteropTarget {
         throws IgniteCheckedException;
 
     /** {@inheritDoc} */
-    @Override public void inOutAsyncOp(int type, long ptr, int len, long cb, long cbArg)
throws IgniteCheckedException {
+    @Override public void inOutOpAsync(int type,
+        long ptr,
+        int len,
+        final long cb,
+        final long cbData)
+        throws IgniteCheckedException
+    {
+        InteropOffheapInputStream in = new InteropOffheapInputStream(ptr, len);
+
+        InteropMarshaller marsh = proc.marshaller();
+
+        InteropOffheapOutputStream out = new InteropOffheapOutputStream(1024);
+
+        IgniteInternalFuture<Void> fut = inOutAsyncOp(type, in, out, marsh);
+
+        fut.listen(new CI1<IgniteInternalFuture>() {
+            @Override public void apply(final IgniteInternalFuture fut) {
+                proc.context().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Thread.sleep(500);
+
+                            fut.get();
+
+                            InteropUtils.asyncCallback(cb, cbData, 1, 0);
+                        }
+                        catch (Throwable e) {
+                            e.printStackTrace();
+
+                            InteropUtils.asyncCallback(cb, cbData, -1, 0);
+                        }
+                    }
+                });
+            }
+        });
     }
+
+    /**
+     * @param type Type.
+     * @param in Input.
+     * @param out Output.
+     * @param marsh Marshaller.
+     * @return Future.
+     */
+    protected abstract IgniteInternalFuture<Void> inOutAsyncOp(int type,
+        InteropInputStream in,
+        InteropOutputStream out,
+        InteropMarshaller marsh);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java
index 73987e3..ed3c521 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/interop/InteropUtils.java
@@ -37,6 +37,8 @@ public class InteropUtils {
     /**
      * @param cb Callback address.
      * @param cbArg Value passed to callback.
+     * @param resType Result type.
+     * @param res Result.
      */
-    native public void asyncCallback(long cb, long cbArg);
+    native public static void asyncCallback(long cb, long cbArg, int resType, long res);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp
----------------------------------------------------------------------
diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp
b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp
index a2db741..a28cf2f 100644
--- a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp
+++ b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.cpp
@@ -61,6 +61,7 @@ JniMethod M_IGNITION_EX_START_WITH_CLO = JniMethod("startWithClosure", "(Ljava/l
 
 const char* C_INTEROP_UTILS = "org/apache/ignite/internal/processors/interop/InteropUtils";
 JniMethod M_INTEROP_UTILS_INTEROP = JniMethod("interop", "(Lorg/apache/ignite/Ignite;)Lorg/apache/ignite/internal/processors/interop/InteropProcessor;",
true);
+JniMethod M_INTEROP_UTILS_ASYNC_CALLBACK = JniMethod("asyncCallback", "(JJIJ)V", true);
 
 const char* C_INTEROP_PROCESSOR = "org/apache/ignite/internal/processors/interop/InteropProcessor";
 JniMethod M_INTEROP_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/interop/InteropTarget;",
false);
@@ -68,6 +69,7 @@ JniMethod M_INTEROP_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lo
 const char* C_INTEROP_TARGET = "org/apache/ignite/internal/processors/interop/InteropTargetAdapter";
 JniMethod M_INTEROP_TARGET_IN_OP = JniMethod("inOp", "(IJI)I", false);
 JniMethod M_INTEROP_TARGET_IN_OUT_OP = JniMethod("inOutOp", "(IJI)J", false);
+JniMethod M_INTEROP_TARGET_IN_OUT_OP_ASYNC = JniMethod("inOutOpAsync", "(IJIJJ)V", false);
 
 const char* C_INTEROP_CACHE = "org/apache/ignite/internal/processors/interop/InteropCache";
 
@@ -99,6 +101,12 @@ jmethodID FindMethod(JNIEnv* env, jclass cls, JniMethod mthd) {
 	return res;
 }
 
+JNIEXPORT void JNICALL JniAsyncCallback(JNIEnv *env, jclass cls, jlong cb, jlong cbData,
jint resType, jlong res) {
+	IgniteAsyncCallback cbPtr = (IgniteAsyncCallback)cb;
+	cbPtr((void*)cbData, resType, (void*)res);
+}
+
+
 /* Internal context initialization routine. */
 int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv) {
 	// 1. Check if another JVM is already started.
@@ -186,6 +194,11 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv)
{
 		if (!ctx->m_InteropUtils_interop)
 			return JNI_ERR;
 
+		ctx->m_InteropUtils_asyncCallback = FindMethod(env, ctx->c_InteropUtils, M_INTEROP_UTILS_ASYNC_CALLBACK);
+
+		if (!ctx->m_InteropUtils_asyncCallback)
+			return JNI_ERR;
+
 		ctx->c_InteropTarget = FindClass(env, C_INTEROP_TARGET);
 
 		if (!ctx->c_InteropTarget)
@@ -201,6 +214,11 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv)
{
 		if (!ctx->m_InteropTarget_inOutOp)
 			return JNI_ERR;
 
+		ctx->m_InteropTarget_inOutOpAsync = FindMethod(env, ctx->c_InteropTarget, M_INTEROP_TARGET_IN_OUT_OP_ASYNC);
+
+		if (!ctx->m_InteropTarget_inOutOpAsync)
+			return JNI_ERR;
+
 		ctx->c_InteropCache = FindClass(env, C_INTEROP_CACHE);
 
 		if (!ctx->c_InteropCache)
@@ -208,22 +226,20 @@ int ContextInit0(JavaVMInitArgs args, JavaVM** retJvm, JNIEnv** retEnv)
{
 
 
 		// 4. Register natives.
-		/*
 		{
-			JNINativeMethod methods[22];
+			JNINativeMethod methods[1];
 
 			int idx = 0;
 
-			methods[idx].name = (char*)M_GRID_INTEROP_UTILS_ON_START.name;
-			methods[idx].signature = (char*)M_GRID_INTEROP_UTILS_ON_START.sign;
-			methods[idx++].fnPtr = JniOnStart;
+			methods[idx].name = (char*)M_INTEROP_UTILS_ASYNC_CALLBACK.name;
+			methods[idx].signature = (char*)M_INTEROP_UTILS_ASYNC_CALLBACK.sign;
+			methods[idx++].fnPtr = JniAsyncCallback;
 
-			res = env->RegisterNatives(ctx->c_GridInteropUtils, methods, idx);
+			res = env->RegisterNatives(ctx->c_InteropUtils, methods, idx);
 
 			if (res != JNI_OK)
 				return res;
 		}
-		*/
 
 		// JNI Env is only necessary for error handling, so we nullify to keep invariant.
 		*retEnv = NULL;
@@ -471,6 +487,16 @@ void* IgniteInteropAbstractTarget::inOutOp(jint type, void* ptr, jint
len) {
 	return (void*)res;
 }
 
+void IgniteInteropAbstractTarget::inOpAsync(jint type, void* ptr, jint len, IgniteAsyncCallback
cb, void* data) {
+	JNIEnv* env = Attach();
+
+	env->CallNonvirtualVoidMethod(this->obj, this->cls, Context()->m_InteropTarget_inOutOpAsync,
type, ptr, len, cb, data);
+
+	if (env->ExceptionCheck()) {
+		printError(env);
+	}
+}
+
 IGNITE_API_IMPORT_EXPORT IgniteInteropNode* StartNode() {
 	JNIEnv* env = Attach();
 
@@ -546,6 +572,10 @@ void IgniteInteropCache::put(void* ptr, jint len) {
 	this->inOp(0, ptr, len);
 }
 
+void IgniteInteropCache::putAsync(void* ptr, jint len, IgniteAsyncCallback cb, void* data)
{
+	this->inOpAsync(0, ptr, len, cb, data);
+}
+
 void* IgniteInteropCache::get(void* ptr, jint len) {
 	return this->inOutOp(0, ptr, len);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h
----------------------------------------------------------------------
diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h
b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h
index 3849a30..607ab16 100644
--- a/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h
+++ b/modules/interop/src/main/cpp/ignite-interop/ignite-interop-api/ignite-interop-api.h
@@ -54,6 +54,7 @@ struct JniContext {
 
 	jclass c_InteropUtils;
 	jmethodID m_InteropUtils_interop;
+	jmethodID m_InteropUtils_asyncCallback;
 
 	jclass c_InteropProcessor;
 	jmethodID m_InteropProcessor_cache;
@@ -61,6 +62,7 @@ struct JniContext {
 	jclass c_InteropTarget;
 	jmethodID m_InteropTarget_inOp;
 	jmethodID m_InteropTarget_inOutOp;
+	jmethodID m_InteropTarget_inOutOpAsync;
 
 	jclass c_InteropCache;
 };
@@ -291,6 +293,8 @@ private:
 	InteropByteBuffer bytes;
 };
 
+typedef void(*IgniteAsyncCallback)(void* data, int resType, void* res);
+
 class IGNITE_API_IMPORT_EXPORT IgniteInteropAbstractTarget {
 protected:
 	/** Target class for non-virtual invokes. */
@@ -309,6 +313,8 @@ public:
 	jint inOp(jint type, void* ptr, jint len);
 
 	void* inOutOp(jint type, void* ptr, jint len);
+
+	void inOpAsync(jint type, void* ptr, jint len, IgniteAsyncCallback cb, void* data);
 };
 
 class IGNITE_API_IMPORT_EXPORT IgniteInteropCache : public IgniteInteropAbstractTarget {
@@ -317,6 +323,8 @@ public:
 
 	void put(void* ptr, jint len);
 
+	void putAsync(void* ptr, jint len, IgniteAsyncCallback cb, void* data);
+
 	void* get(void* ptr, jint len);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fa19eb6/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp
----------------------------------------------------------------------
diff --git a/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp
b/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp
index 32a6f6d..c8b20a2 100644
--- a/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp
+++ b/modules/interop/src/main/cpp/ignite-interop/ignite-nodejs-prototype/src/ignite_cache.cpp
@@ -145,11 +145,15 @@ void AfterQueueWork(uv_work_t* req)
 }
 
 void freeCallback(uv_async_t *async) {
+	std::cout << "NodeJs free async handle";
+
 	delete async;
 }
 
 void AfterExecute(uv_async_t *async)
 {
+	std::cout << "NodeJs after execute";
+
 	Isolate* isolate = Isolate::GetCurrent();
 
 	HandleScope scope(isolate);
@@ -177,8 +181,10 @@ void AfterExecute(uv_async_t *async)
 		argv);
 }
 
-void igniteAsyncCallback(AsyncData* asyncData)
+void igniteAsyncCallback(AsyncData* asyncData, int resType, void* res)
 {
+	std::cout << "NodeJs async callback, resType=" << resType << "\n";
+
 	uv_async_send(asyncData->async);
 }
 
@@ -201,7 +207,8 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>&
args) {
 	
 	AsyncData* asyncData = new AsyncData();
 
-	asyncData->cb = Persistent<Function>(isolate, cb);
+	// asyncData = Persistent<Function>(isolate, cb);
+	asyncData->cb.Reset(isolate, cb);
 
 	std::cout << "Put async key=" << key << ", val=" << val <<
"\n";
 
@@ -218,7 +225,7 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>&
args) {
 
 	uv_async_init(uv_default_loop(), async, AfterExecute);
 
-	cache->cache->put(out.data(), out.size());
+	cache->cache->putAsync(out.data(), out.size(), (IgniteAsyncCallback)igniteAsyncCallback,
asyncData);
 
 	// uv_queue_work(uv_default_loop(), 0, QueueWorkNoop, (uv_after_work_cb)AfterQueueWork);
 
@@ -226,6 +233,32 @@ void IgniteCache::PutAsync(const FunctionCallbackInfo<Value>&
args) {
 	//args.GetReturnValue().Set(Number::New(isolate, obj->value_));
 }
 
+void printPropertis(Local<Value> prop) {
+	if (prop->IsObject()) {
+		Local<Object> obj = Local<Object>::Cast(prop);
+
+		std::cout << "Constructor: " << *v8::String::Utf8Value(obj->GetConstructorName())
<< "\n";
+
+		Local<Array> props = obj->GetPropertyNames();
+
+		std::cout << "Properties (" << props->Length() << "):\n";
+
+		for (int i = 0; i < props->Length(); i++) {
+			Local<Value> prop = props->Get(i);
+
+			Local<Value> val = obj->Get(prop);
+
+			if (!val->IsFunction())
+				std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" <<
*v8::String::Utf8Value(val->ToString()) << "\n";
+
+			/*
+			std::string name(*v8::String::Utf8Value(prop->ToString()));
+
+			if (name != "global" && name != "EventEmitter")
+				printPropertis(val);*/
+		}
+	}
+}
 void IgniteCache::ObjectInfo(const FunctionCallbackInfo<Value>& args) {
 	Isolate* isolate = Isolate::GetCurrent();
 	HandleScope scope(isolate);
@@ -255,4 +288,65 @@ void IgniteCache::ObjectInfo(const FunctionCallbackInfo<Value>&
args) {
 			std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString())
<< "\n";
 		}
 	}
-}
\ No newline at end of file
+
+	Handle<v8::Object> global = isolate->GetCurrentContext()->Global();
+
+	Local<Object> process = Local<Object>::Cast(global->Get(String::NewFromUtf8(isolate,
"process")));
+
+	printPropertis(process);
+	/*
+	Handle<v8::Object> global = isolate->GetCurrentContext()->Global();
+
+	Local<Object> process = Local<Object>::Cast(global->Get(String::NewFromUtf8(isolate,
"process")));
+
+	Local<Object> module = Local<Object>::Cast(process->Get(String::NewFromUtf8(isolate,
"mainModule")));
+
+	Local<Object> children = Local<Object>::Cast(module->Get(String::NewFromUtf8(isolate,
"children")));
+
+	printPropertis(children);
+	*/
+	/*
+	Local<Object> obj = Local<Object>::Cast(args[0]);
+	
+	std::cout << "Constructor: " << *v8::String::Utf8Value(obj->GetConstructorName())
<< "\n";
+
+	Local<Array> props = obj->GetPropertyNames();
+
+	std::cout << "Properties (" << props->Length() << "):\n";
+
+	for (int i = 0; i < props->Length(); i++) {
+		Local<Value> prop = props->Get(i);
+
+		Local<Value> val = obj->Get(prop);
+
+		if (!val->IsFunction()) {
+			std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString())
<< "\n";
+		}
+	}
+
+	Local<v8::Context> context = obj->CreationContext();
+
+	Handle<v8::Object> global = context->Global();
+	
+	props = global->GetPropertyNames();
+
+	std::cout << "Global properties (" << props->Length() << "):\n";
+
+	for (int i = 0; i < props->Length(); i++) {
+		Local<Value> prop = props->Get(i);
+
+		Local<Value> val = obj->Get(prop);
+
+		//if (!val->IsFunction()) {
+			std::cout << *v8::String::Utf8Value(prop->ToString()) << "=" << *v8::String::Utf8Value(val->ToString())
<< "\n";
+		//}
+	}
+
+	Handle<v8::Value> value = global->Get(String::NewFromUtf8(isolate, "Apple"));
+
+	if (value->IsFunction())  {
+		std::cout << "Found function!";
+	}
+	else
+		std::cout << "Function not found!";*/
+}


Mime
View raw message