celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [1/2] celix git commit: CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.
Date Tue, 11 Apr 2017 19:24:30 GMT
Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-408_runtime 97df926c2 -> 7efe4331a


http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/pubsub/pubsub_common/public/src/dyn_msg_utils.c
deleted file mode 100644
index 1c6bbcd..0000000
--- a/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.c
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <dirent.h>
-
-#include "utils.h"
-#include "dyn_message.h"
-
-#include "dyn_msg_utils.h"
-
-#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
-
-static char * getMsgDescriptionDir(bundle_pt bundle);
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
-
-
-unsigned int uintHash(const void * uintNum) {
-	return *((unsigned int*)uintNum);
-}
-
-int uintEquals(const void * uintNum, const void * toCompare) {
-	return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
-}
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
-
-	char *root = NULL;
-	char *metaInfPath = NULL;
-
-	root = getMsgDescriptionDir(bundle);
-
-	if(root != NULL){
-		asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
-
-		addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
-		addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
-
-		free(metaInfPath);
-		free(root);
-	}
-}
-
-void emptyMsgTypesMap(hash_map_pt msgTypesMap)
-{
-	hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
-
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
-	}
-	hashMap_clear(msgTypesMap, true, false);
-	hashMapIterator_destroy(iter);
-}
-
-static char * getMsgDescriptionDir(bundle_pt bundle)
-{
-	char *root = NULL;
-
-	bool isSystemBundle = false;
-	bundle_isSystemBundle(bundle, &isSystemBundle);
-
-	if(isSystemBundle == true) {
-		bundle_context_pt context;
-		bundle_getContext(bundle, &context);
-
-		const char *prop = NULL;
-
-		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
-
-		if(prop != NULL) {
-			root = strdup(prop);
-		} else {
-			root = getcwd(NULL, 0);
-		}
-	} else {
-		bundle_getEntry(bundle, ".", &root);
-	}
-
-	return root;
-}
-
-
-static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
-{
-	char path[128];
-	struct dirent *entry = NULL;
-	DIR *dir = opendir(root);
-
-	if(dir) {
-		entry = readdir(dir);
-	}
-
-	while (entry != NULL) {
-
-		if (strstr(entry->d_name, ".descriptor") != NULL) {
-
-			printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
-			memset(path,0,128);
-			snprintf(path, 128, "%s/%s", root, entry->d_name);
-			FILE *stream = fopen(path,"r");
-
-			if (stream != NULL){
-				dyn_message_type* msgType = NULL;
-
-				int rc = dynMessage_parse(stream, &msgType);
-				if (rc == 0 && msgType!=NULL) {
-
-					char* msgName = NULL;
-					dynMessage_getName(msgType,&msgName);
-
-					if(msgName!=NULL){
-						unsigned int* msgId = malloc(sizeof(unsigned int));
-						*msgId = utils_stringHash(msgName);
-						hashMap_put(msgTypesMap,msgId,msgType);
-					}
-
-				}
-				else{
-					printf("DMU: cannot parse message from descriptor %s\n.",path);
-				}
-				fclose(stream);
-			}else{
-				printf("DMU: cannot open descriptor file %s\n.",path);
-			}
-
-		}
-		entry = readdir(dir);
-	}
-
-	if(dir) {
-		closedir(dir);
-	}
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index 61b1cd9..a5798a4 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,6 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
     SOURCES
     	private/src/ps_activator.c
     	private/src/pubsub_serializer_impl.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index f026300..5299808 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -34,32 +34,25 @@
 
 #include "pubsub_serializer.h"
 
-struct _pubsub_message_type {	/* _dyn_message_type */
-	struct namvals_head header;
-	struct namvals_head annotations;
-	struct types_head types;
-	dyn_type *msgType;
-	version_pt msgVersion;
-};
-
-struct pubsub_serializer {
+typedef struct pubsub_serializer {
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
-};
-
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer);
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer);
+} pubsub_serializer_t;
 
-/* Start of serializer specific functions */
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type
*msgType, const void *input, void **output, int *outputLen);
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type
*msgType, const void *input, void **output);
+typedef struct pubsub_msg_serializer_impl {
+    pubsub_msg_serializer_t msgSerializer;
+    dyn_message_type* dynMsg;
+} pubsub_msg_serializer_impl_t;
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt
bundle);
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type
*msgType);
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType,
void *msg);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt
bundle, pubsub_msg_serializer_map_t** out);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t*
map);
 
+/* Start of serializer specific functions */
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void*
msg, char** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const
char* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg);
 
 #endif /* PUBSUB_SERIALIZER_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index b83c26b..e0e23d4 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -32,8 +32,8 @@
 #include "pubsub_serializer_impl.h"
 
 struct activator {
-	pubsub_serializer_pt serializer;
-	pubsub_serializer_service_pt serializerService;
+	pubsub_serializer_t* serializer;
+	pubsub_serializer_service_t* serializerService;
 	service_registration_pt registration;
 };
 
@@ -56,24 +56,16 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void
**userData
 celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
 	celix_status_t status = CELIX_SUCCESS;
 	struct activator *activator = userData;
-	pubsub_serializer_service_pt pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+	pubsub_serializer_service_t* pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
 
 	if (!pubsubSerializerSvc) {
 		status = CELIX_ENOMEM;
 	}
 	else{
-		pubsubSerializerSvc->serializer = activator->serializer;
-
-		pubsubSerializerSvc->serialize = pubsubSerializer_serialize;
-		pubsubSerializerSvc->deserialize = pubsubSerializer_deserialize;
-
-		pubsubSerializerSvc->fillMsgTypesMap = pubsubSerializer_fillMsgTypesMap;
-		pubsubSerializerSvc->emptyMsgTypesMap = pubsubSerializer_emptyMsgTypesMap;
-
-		pubsubSerializerSvc->getVersion = pubsubSerializer_getVersion;
-		pubsubSerializerSvc->getName = pubsubSerializer_getName;
-		pubsubSerializerSvc->freeMsg = pubsubSerializer_freeMsg;
+		pubsubSerializerSvc->handle = activator->serializer;
 
+		pubsubSerializerSvc->createSerializerMap = (void*)pubsubSerializer_createSerializerMap;
+		pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
 		activator->serializerService = pubsubSerializerSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc,
NULL, &activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 60d5c98..2dd8258 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -26,30 +26,28 @@
 
 #include <stdio.h>
 #include <stdlib.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
 #include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <inttypes.h>
 
-#include "constants.h"
 #include "utils.h"
 #include "hash_map.h"
-#include "array_list.h"
 #include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
+
 #include "log_helper.h"
-#include "log_service.h"
-#include "service_factory.h"
 
 #include "json_serializer.h"
-#include "dyn_msg_utils.h"
 
 #include "pubsub_serializer_impl.h"
 
-celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer)
{
+#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle,
hash_map_pt msgTypesMap);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t** serializer)
{
 	celix_status_t status = CELIX_SUCCESS;
 
 	*serializer = calloc(1, sizeof(**serializer));
@@ -70,7 +68,7 @@ celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_seriali
 	return status;
 }
 
-celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	logHelper_stop(serializer->loghelper);
@@ -81,63 +79,197 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer)
{
 	return status;
 }
 
-celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type
*msgType, const void *input, void **output, int *outputLen){
-	celix_status_t status = CELIX_SUCCESS;
-
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt
bundle, pubsub_msg_serializer_map_t** out) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
+    if (status == CELIX_SUCCESS) {
+        map->bundle = bundle;
+        map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
+        pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
+    } else {
+        logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory
for msg map");
+        status = CELIX_ENOMEM;
+    }
 
-	char *jsonOutput = NULL;
-	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
-	if (rc != 0){
-		status = CELIX_BUNDLE_EXCEPTION;
-	}
+    if (status == CELIX_SUCCESS) {
+        *out = map;
+    }
+    return status;
+}
 
-	*output = (void *) jsonOutput;
-	*outputLen = strlen(jsonOutput) + 1;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t*
map) {
+    celix_status_t status = CELIX_SUCCESS;
+    if (map == NULL) {
+        return status;
+    }
 
-	return status;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        pubsub_msg_serializer_impl_t* impl = msgSer->handle;
+        dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version
owned by dynType
+        free(impl); //also contains the service struct.
+    }
+    hashMap_destroy(map->serializers, false, false);
+    free(map);
+    return status;
 }
 
-celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type
*msgType, const void *input, void **output){
-	celix_status_t status = CELIX_SUCCESS;
 
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void*
msg, char** out, size_t *outLen) {
+    celix_status_t status = CELIX_SUCCESS;
 
-	void *textOutput = NULL;
-	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+    char *jsonOutput = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+	int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
 	if (rc != 0){
 		status = CELIX_BUNDLE_EXCEPTION;
 	}
-
-	*output = textOutput;
+    if (status == CELIX_SUCCESS) {
+        *out = jsonOutput;
+        *outLen = strlen(jsonOutput) + 1;
+    }
 
 	return status;
 }
 
-void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,
bundle_pt bundle){
-	fillMsgTypesMap(msgTypesMap, bundle);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const
char* input, size_t inputLen, void **out) {
+    celix_status_t status = CELIX_SUCCESS;
+    void *msg = NULL;
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    int rc = jsonSerializer_deserialize(dynType, input, &msg);
+    if (rc != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+    if (status == CELIX_SUCCESS) {
+        *out = msg;
+    }
+	return status;
 }
 
-void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap){
-	emptyMsgTypesMap(msgTypesMap);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) {
+    dyn_type* dynType = NULL;
+    dynMessage_getMessageType(impl->dynMsg, &dynType);
+    if (dynType != NULL) {
+        dynType_free(dynType, msg);
+    }
 }
 
-version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type
*msgType){
-	version_pt msgVersion = NULL;
-	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
-	return msgVersion;
+
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle)
{
+    char* root = NULL;
+    char* metaInfPath = NULL;
+
+    root = pubsubSerializer_getMsgDescriptionDir(bundle);
+
+    if(root != NULL){
+        asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+
+        pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+        pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+
+        free(metaInfPath);
+        free(root);
+    }
 }
 
-char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
-	char *name = NULL;
-	dynMessage_getName((dyn_message_type *) msgType, &name);
-	return name;
+static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
+{
+    char *root = NULL;
+
+    bool isSystemBundle = false;
+    bundle_isSystemBundle(bundle, &isSystemBundle);
+
+    if(isSystemBundle == true) {
+        bundle_context_pt context;
+        bundle_getContext(bundle, &context);
+
+        const char *prop = NULL;
+
+        bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+        if(prop != NULL) {
+            root = strdup(prop);
+        } else {
+            root = getcwd(NULL, 0);
+        }
+    } else {
+        bundle_getEntry(bundle, ".", &root);
+    }
+
+    return root;
 }
 
-void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType,
void *msg){
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-	dynType_free(type, msg);
+
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle,
hash_map_pt msgSerializers)
+{
+    char path[128];
+    struct dirent *entry = NULL;
+    DIR *dir = opendir(root);
+
+    if(dir) {
+        entry = readdir(dir);
+    }
+
+    while (entry != NULL) {
+
+        if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+            printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+            memset(path,0,128);
+            snprintf(path, 128, "%s/%s", root, entry->d_name);
+            FILE *stream = fopen(path,"r");
+
+            if (stream != NULL){
+                dyn_message_type* msgType = NULL;
+
+                int rc = dynMessage_parse(stream, &msgType);
+                if (rc == 0 && msgType != NULL) {
+
+                    char* msgName = NULL;
+                    dynMessage_getName(msgType,&msgName);
+
+                    version_pt msgVersion = NULL;
+                    dynMessage_getVersion(msgType, &msgVersion);
+
+                    unsigned int msgId = utils_stringHash(msgName);
+
+                    pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl));
+                    impl->dynMsg = msgType;
+                    impl->msgSerializer.handle = impl;
+                    impl->msgSerializer.msgId = msgId;
+                    impl->msgSerializer.msgName = msgName;
+                    impl->msgSerializer.msgVersion = msgVersion;
+                    impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize;
+                    impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize;
+                    impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+                    bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+                    if (clash) {
+                        printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+                    } else if ( msgName != NULL && msgVersion != NULL &&
msgId != 0) {
+                        hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
+                    } else {
+                        printf("Error adding creating msg serializer\n");
+                    }
+
+                }
+                else{
+                    printf("DMU: cannot parse message from descriptor %s\n.",path);
+                }
+                fclose(stream);
+            }else{
+                printf("DMU: cannot open descriptor file %s\n.",path);
+            }
+
+        }
+        entry = readdir(dir);
+    }
+
+    if(dir) {
+        closedir(dir);
+    }
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 3826a25..36ea422 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -328,7 +328,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle,
servic
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -360,7 +360,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle,
ser
 	celix_status_t status = CELIX_SUCCESS;
 
 	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_pt new_serializer = (pubsub_serializer_service_pt) service;
+	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
 
 	celixThreadMutex_lock(&manager->serializerListLock);
 
@@ -377,7 +377,7 @@ celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle,
ser
 
 	if (arrayList_size(manager->serializerList) > 0){
 		//there is another serializer available, change the admin so it is using another serializer
-		pubsub_serializer_service_pt replacing_serializer = (pubsub_serializer_service_pt) arrayList_get(manager->serializerList,0);
+		pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
 
 		for(j=0; j<arrayList_size(manager->psaList); j++){
 			pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 808644c..03b15ba 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -5,4 +5,4 @@ version=1.0.0
 :annotations
 :types
 :message
-{n seqnR}
\ No newline at end of file
+{n seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 6d957a0..266f73e 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -145,13 +145,14 @@ TEST_GROUP(PUBSUB_INT_GROUP)
         //check if message are returned
         msg_t initMsg;
         initMsg.seqNr = 0;
+        int count = 0;
         for (int i = 0; i < TRIES; ++i) {
             pthread_mutex_lock(&g_act.mutex);
             g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &initMsg);
             pthread_mutex_unlock(&g_act.mutex);
             usleep(TIMEOUT);
             pthread_mutex_lock(&g_act.mutex);
-            int count = g_act.count;
+            count = g_act.count;
             pthread_mutex_unlock(&g_act.mutex);
             if (count > 0) {
                 break;
@@ -159,6 +160,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
                 printf("No return message received, waiting for a while\n");
             }
         }
+        CHECK(count > 0);
 	}
 
 	void teardown() {
@@ -167,7 +169,10 @@ TEST_GROUP(PUBSUB_INT_GROUP)
 };
 
 TEST(PUBSUB_INT_GROUP, sendRecvTest) {
-    g_act.count = 0;
+    pthread_mutex_lock(&g_act.mutex);
+    g_act.count = 0; //reset counter
+    pthread_mutex_unlock(&g_act.mutex);
+
     constexpr int COUNT = 50;
     msg_t msg;
     for (int i = 0; i < COUNT; ++i) {


Mime
View raw message