celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [16/19] celix git commit: CELIX-389: Refactors pubsub.
Date Mon, 06 Feb 2017 18:34:28 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
deleted file mode 100644
index e670899..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ /dev/null
@@ -1,732 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.c
- *
- *  \date       Sep 30, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "pubsub_admin_impl.h"
-#include <zmq.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <netdb.h>
-
-#ifndef ANDROID
-#include <ifaddrs.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "constants.h"
-#include "utils.h"
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_helper.h"
-#include "log_service.h"
-#include "celix_threads.h"
-#include "service_factory.h"
-
-#include "topic_subscription.h"
-#include "pubsub_publish_service_private.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_utils.h"
-#include "subscriber.h"
-
-#define MAX_KEY_FOLDER_PATH_LENGTH 512
-
-static const char *DEFAULT_IP = "127.0.0.1";
-
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score);
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
-	celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (!zsys_has_curve()){
-		printf("PSA: zeromq curve unsupported\n");
-		return CELIX_SERVICE_EXCEPTION;
-	}
-#endif
-
-	*admin = calloc(1, sizeof(**admin));
-
-	if (!*admin) {
-		status = CELIX_ENOMEM;
-	}
-	else{
-
-		const char *ip = NULL;
-		char *detectedIp = NULL;
-		(*admin)->bundle_context= context;
-		(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-		(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-		(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-		(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
-		celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
-		celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
-		celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
-
-		if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
-			logHelper_start((*admin)->loghelper);
-		}
-
-		bundleContext_getProperty(context,PSA_IP , &ip);
-
-#ifndef ANDROID
-		if (ip == NULL) {
-			const char *interface = NULL;
-
-			bundleContext_getProperty(context, PSA_ITF, &interface);
-			if (pubsubAdmin_getIpAddress(interface, &detectedIp) != CELIX_SUCCESS) {
-				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP adress for interface %s", interface);
-			}
-
-			ip = detectedIp;
-		}
-#endif
-
-		if (ip != NULL) {
-			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", ip);
-			(*admin)->ipAddress = strdup(ip);
-		}
-		else {
-			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_IP);
-			(*admin)->ipAddress = strdup(DEFAULT_IP);
-		}
-
-		if (detectedIp != NULL) {
-			free(detectedIp);
-		}
-
-        const char* basePortStr = NULL;
-        const char* maxPortStr = NULL;
-        char* endptrBase = NULL;
-        char* endptrMax = NULL;
-        bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
-        bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
-        (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
-        (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
-        if (*endptrBase != '\0') {
-            (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
-        }
-        if (*endptrMax != '\0') {
-            (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
-        }
-
-        printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
-
-        // Disable Signal Handling by CZMQ
-		setenv("ZSYS_SIGHANDLER", "false", true);
-
-		const char *nrZmqThreads = NULL;
-		bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
-
-		if(nrZmqThreads != NULL) {
-			char *endPtr = NULL;
-			unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
-			if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
-				zsys_set_io_threads(nrThreads);
-				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %d threads for ZMQ", nrThreads);
-				printf("PSA: Using %d threads for ZMQ\n", nrThreads);
-			}
-		}
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-		// Setup authenticator
-		zactor_t* auth = zactor_new (zauth, NULL);
-		zstr_sendx(auth, "VERBOSE", NULL);
-
-		// Load all public keys of subscribers into the application
-		// This step is done for authenticating subscribers
-		char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
-		char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
-		snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
-		zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
-		free(keys_bundle_dir);
-
-		(*admin)->zmq_auth = auth;
-#endif
-
-	}
-
-	return status;
-}
-
-
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
-{
-	celix_status_t status = CELIX_SUCCESS;
-
-	free(admin->ipAddress);
-
-	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-	hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		free((char*)hashMapEntry_getKey(entry));
-		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-	}
-	hashMapIterator_destroy(iter);
-	hashMap_destroy(admin->pendingSubscriptions,false,false);
-	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-	celixThreadMutex_lock(&admin->subscriptionsLock);
-	hashMap_destroy(admin->subscriptions,false,false);
-	celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-	celixThreadMutex_lock(&admin->localPublicationsLock);
-	hashMap_destroy(admin->localPublications,true,false);
-	celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-	celixThreadMutex_lock(&admin->externalPublicationsLock);
-	iter = hashMapIterator_create(admin->externalPublications);
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		free((char*)hashMapEntry_getKey(entry));
-		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-	}
-	hashMapIterator_destroy(iter);
-	hashMap_destroy(admin->externalPublications,false,false);
-	celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
-	celixThreadMutex_destroy(&admin->subscriptionsLock);
-	celixThreadMutex_destroy(&admin->localPublicationsLock);
-	celixThreadMutex_destroy(&admin->externalPublicationsLock);
-
-	logHelper_stop(admin->loghelper);
-
-	logHelper_destroy(&admin->loghelper);
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (admin->zmq_auth != NULL){
-		zactor_destroy(&(admin->zmq_auth));
-	}
-#endif
-
-	free(admin);
-
-	return status;
-}
-
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&admin->subscriptionsLock);
-
-	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-
-	if(any_sub==NULL){
-
-		int i;
-
-		status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub);
-
-		if (status == CELIX_SUCCESS){
-
-			/* Connect all internal publishers */
-			celixThreadMutex_lock(&admin->localPublicationsLock);
-			hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
-			while(hashMapIterator_hasNext(lp_iter)){
-				service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
-				topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
-				array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-				if(topic_publishers!=NULL){
-					for(i=0;i<arrayList_size(topic_publishers);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-						if(pubEP->endpoint !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
-						}
-					}
-				}
-			}
-			hashMapIterator_destroy(lp_iter);
-			celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-			/* Connect also all external publishers */
-			celixThreadMutex_lock(&admin->externalPublicationsLock);
-			hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
-			while(hashMapIterator_hasNext(extp_iter)){
-				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
-				if(ext_pub_list!=NULL){
-					for(i=0;i<arrayList_size(ext_pub_list);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(pubEP->endpoint !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
-						}
-					}
-				}
-			}
-			hashMapIterator_destroy(extp_iter);
-			celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-
-			pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
-
-			status += pubsub_topicSubscriptionStart(any_sub);
-
-		}
-
-		if (status == CELIX_SUCCESS){
-			hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
-		}
-
-	}
-
-	celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-	return status;
-}
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-
-	printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
-
-	if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
-		return pubsubAdmin_addAnySubscription(admin,subEP);
-	}
-
-	/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
-	celixThreadMutex_lock(&admin->localPublicationsLock);
-	celixThreadMutex_lock(&admin->externalPublicationsLock);
-
-    char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
-
-	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-
-	if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
-		celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
-		celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-	} else {
-		int i;
-		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
-
-		if(subscription == NULL) {
-			status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP,subEP->scope, subEP->topic,&subscription);
-
-			if (status==CELIX_SUCCESS){
-
-				/* Try to connect internal publishers */
-				if(factory!=NULL){
-					topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
-					array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-					if(topic_publishers!=NULL){
-						for(i=0;i<arrayList_size(topic_publishers);i++){
-							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-							if(pubEP->endpoint !=NULL){
-								status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
-							}
-						}
-					}
-
-				}
-
-				/* Look also for external publishers */
-				if(ext_pub_list!=NULL){
-					for(i=0;i<arrayList_size(ext_pub_list);i++){
-						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-						if(pubEP->endpoint !=NULL){
-							status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
-						}
-					}
-				}
-
-				pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
-
-				status += pubsub_topicSubscriptionStart(subscription);
-
-			}
-
-			if(status==CELIX_SUCCESS){
-				celixThreadMutex_lock(&admin->subscriptionsLock);
-				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-				celixThreadMutex_unlock(&admin->subscriptionsLock);
-			}
-		}
-
-		if (status == CELIX_SUCCESS){
-			pubsub_topicIncreaseNrSubscribers(subscription);
-		}
-	}
-
-    free(scope_topic);
-	celixThreadMutex_unlock(&admin->externalPublicationsLock);
-	celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-	return status;
-
-}
-
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-
-	printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
-
-	celixThreadMutex_lock(&admin->subscriptionsLock);
-
-	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,subEP->topic);
-
-	if(sub!=NULL){
-		pubsub_topicDecreaseNrSubscribers(sub);
-		if(pubsub_topicGetNrSubscribers(sub) == 0) {
-			status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
-		}
-	}
-	else{
-		status = CELIX_ILLEGAL_STATE;
-	}
-
-	celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-	return status;
-
-}
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-
-    printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic);
-
-    const char* fwUUID = NULL;
-
-    char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-    if (fwUUID == NULL) {
-        printf("PSA: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-
-    if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) {
-
-        celixThreadMutex_lock(&admin->localPublicationsLock);
-
-        service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
-
-        if (factory == NULL) {
-            topic_publication_pt pub = NULL;
-            status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
-            if (status == CELIX_SUCCESS) {
-                status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
-                if (status == CELIX_SUCCESS && factory != NULL) {
-                    hashMap_put(admin->localPublications, strdup(scope_topic), factory);
-                }
-            } else {
-                printf("PSA: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
-            }
-        } else {
-            //just add the new EP to the list
-            topic_publication_pt pub = (topic_publication_pt) factory->handle;
-            pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-        }
-
-        celixThreadMutex_unlock(&admin->localPublicationsLock);
-    } else {
-
-        celixThreadMutex_lock(&admin->externalPublicationsLock);
-        array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
-        if (ext_pub_list == NULL) {
-            arrayList_create(&ext_pub_list);
-            hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
-        }
-
-        arrayList_add(ext_pub_list, pubEP);
-
-        celixThreadMutex_unlock(&admin->externalPublicationsLock);
-    }
-
-    /* Re-evaluate the pending subscriptions */
-    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
-    hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
-    if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
-        char* topic = (char*) hashMapEntry_getKey(pendingSub);
-        array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
-        int i;
-        for (i = 0; i < arrayList_size(pendingSubList); i++) {
-            pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
-            pubsubAdmin_addSubscription(admin, subEP);
-        }
-        hashMap_remove(admin->pendingSubscriptions, scope_topic);
-        arrayList_clear(pendingSubList);
-        arrayList_destroy(pendingSubList);
-        free(topic);
-    }
-
-    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-    /* Connect the new publisher to the subscription for his topic, if there is any */
-    celixThreadMutex_lock(&admin->subscriptionsLock);
-
-    topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
-    if (sub != NULL && pubEP->endpoint != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
-    }
-
-    /* And check also for ANY subscription */
-    topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-    if (any_sub != NULL && pubEP->endpoint != NULL) {
-        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
-    }
-    free(scope_topic);
-
-    celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    return status;
-
-}
-
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
-	celix_status_t status = CELIX_SUCCESS;
-        int count = 0;
-
-	printf("PSA: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
-
-	const char* fwUUID = NULL;
-
-	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
-		printf("PSA: Cannot retrieve fwUUID.\n");
-		return CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-	if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
-
-		celixThreadMutex_lock(&admin->localPublicationsLock);
-
-		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-		if(factory!=NULL){
-			topic_publication_pt pub = (topic_publication_pt)factory->handle;
-			pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
-		}
-		else{
-			status = CELIX_ILLEGAL_STATE;
-		}
-
-		celixThreadMutex_unlock(&admin->localPublicationsLock);
-	}
-	else{
-
-		celixThreadMutex_lock(&admin->externalPublicationsLock);
-		array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-		if(ext_pub_list!=NULL){
-			int i;
-			bool found = false;
-			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
-				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-				found = pubsubEndpoint_equals(pubEP,p);
-				if (found){
-				        found = true;
-					arrayList_remove(ext_pub_list,i);
-				}
-			}
-			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
-			int ext_pub_list_size = arrayList_size(ext_pub_list);
-			for(i=0; i<ext_pub_list_size; i++) {
-				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-				if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
-					count++;
-				}
-			}
-
-			if(ext_pub_list_size == 0){
-				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
-				char* topic = (char*)hashMapEntry_getKey(entry);
-				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
-				hashMap_remove(admin->externalPublications,topic);
-				arrayList_destroy(list);
-				free(topic);
-			}
-
-		}
-
-		celixThreadMutex_unlock(&admin->externalPublicationsLock);
-	}
-
-	/* Check if this publisher was connected to one of our subscribers*/
-	celixThreadMutex_lock(&admin->subscriptionsLock);
-
-	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-	if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
-	}
-
-	/* And check also for ANY subscription */
-	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
-	}
-	free(scope_topic);
-	celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-	return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
-	celix_status_t status = CELIX_SUCCESS;
-
-	printf("PSA: Closing all publications\n");
-
-	celixThreadMutex_lock(&admin->localPublicationsLock);
-	char *scope_topic = createScopeTopicKey(scope, topic);
-	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
-	if(pubsvc_entry!=NULL){
-		char* topic = (char*)hashMapEntry_getKey(pubsvc_entry);
-		service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
-		topic_publication_pt pub = (topic_publication_pt)factory->handle;
-		status += pubsub_topicPublicationStop(pub);
-		status += pubsub_topicPublicationDestroy(pub);
-
-		hashMap_remove(admin->localPublications,scope_topic);
-		free(topic);
-		free(factory);
-	}
-	free(scope_topic);
-	celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-	return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
-	celix_status_t status = CELIX_SUCCESS;
-
-	printf("PSA: Closing all subscriptions\n");
-
-	celixThreadMutex_lock(&admin->subscriptionsLock);
-	char *scope_topic = createScopeTopicKey(scope, topic);
-	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
-	if(sub_entry!=NULL){
-		char* topic = (char*)hashMapEntry_getKey(sub_entry);
-
-		topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
-		status += pubsub_topicSubscriptionStop(ts);
-		status += pubsub_topicSubscriptionDestroy(ts);
-
-		hashMap_remove(admin->subscriptions,scope_topic);
-		free(topic);
-
-	}
-	free(scope_topic);
-	celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-	return status;
-
-}
-
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score){
-	celix_status_t status = CELIX_SUCCESS;
-	status = pubsubAdmin_match(admin, pubEP, score);
-	return status;
-}
-
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score){
-	celix_status_t status = CELIX_SUCCESS;
-	status = pubsubAdmin_match(admin, subEP, score);
-	return status;
-}
-
-static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){
-	celix_status_t status = CELIX_SUCCESS;
-
-	char topic_psa_prop[1024];
-	snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
-
-	const char* psa_to_use = NULL;
-	bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT, &psa_to_use);
-
-	*score = 0;
-	if (strcmp(psa_to_use, "zmq") == 0){
-		*score += 100;
-	}else{
-		*score += 1;
-	}
-
-	return status;
-}
-
-#ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {
-	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-	struct ifaddrs *ifaddr, *ifa;
-	char host[NI_MAXHOST];
-
-	if (getifaddrs(&ifaddr) != -1)
-	{
-		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
-		{
-			if (ifa->ifa_addr == NULL)
-				continue;
-
-			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
-				if (interface == NULL) {
-					*ip = strdup(host);
-					status = CELIX_SUCCESS;
-				}
-				else if (strcmp(ifa->ifa_name, interface) == 0) {
-					*ip = strdup(host);
-					status = CELIX_SUCCESS;
-				}
-			}
-		}
-
-		freeifaddrs(ifaddr);
-	}
-
-	return status;
-}
-#endif
-
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
-	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
-	if(pendingListPerTopic==NULL){
-		arrayList_create(&pendingListPerTopic);
-		hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
-	}
-	arrayList_add(pendingListPerTopic,subEP);
-	free(scope_topic);
-	return status;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
deleted file mode 100644
index 1a036db..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ /dev/null
@@ -1,636 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_publication.c
- *
- *  \date       Sep 24, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "pubsub_publish_service_private.h"
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_registration.h"
-#include "utils.h"
-#include "service_factory.h"
-#include "version.h"
-
-#include "pubsub_common.h"
-#include "dyn_msg_utils.h"
-#include "pubsub_utils.h"
-#include "publisher.h"
-
-#include "pubsub_serializer.h"
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	#include "zmq_crypto.h"
-
-	#define MAX_CERT_PATH_LENGTH 512
-#endif
-
-#define EP_ADDRESS_LEN		32
-#define ZMQ_BIND_MAX_RETRY	5
-
-#define FIRST_SEND_DELAY	2
-
-struct topic_publication {
-	zsock_t* zmq_socket;
-	zcert_t * zmq_cert;
-	char* endpoint;
-	service_registration_pt svcFactoryReg;
-	array_list_pt pub_ep_list; //List<pubsub_endpoint>
-	hash_map_pt boundServices; //<bundle_pt,bound_service>
-	celix_thread_mutex_t tp_lock;
-};
-
-typedef struct publish_bundle_bound_service {
-	topic_publication_pt parent;
-	pubsub_publisher_pt service;
-	bundle_pt bundle;
-	char *topic;
-	hash_map_pt msgTypes;
-	unsigned short getCount;
-	celix_thread_mutex_t mp_lock;
-	bool mp_send_in_progress;
-	array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
-
-typedef struct pubsub_msg{
-	pubsub_msg_header_pt header;
-	char* payload;
-	int payloadSize;
-}* pubsub_msg_pt;
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
-
-static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, void *msg);
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, void *msg, int flags);
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
-
-static void delay_first_send_for_late_joiners(void);
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
-	celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	char* secure_topics = NULL;
-	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
-
-	if (secure_topics){
-		array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
-
-		int i;
-		int secure_topics_size = arrayList_size(secure_topics_list);
-		for (i = 0; i < secure_topics_size; i++){
-			char* top = arrayList_get(secure_topics_list, i);
-			if (strcmp(pubEP->topic, top) == 0){
-				printf("TP: Secure topic: '%s'\n", top);
-				pubEP->is_secure = true;
-			}
-			free(top);
-			top = NULL;
-		}
-
-		arrayList_destroy(secure_topics_list);
-	}
-
-	zcert_t* pub_cert = NULL;
-	if (pubEP->is_secure){
-		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-		if (keys_bundle_dir == NULL){
-			return CELIX_SERVICE_EXCEPTION;
-		}
-
-		const char* keys_file_path = NULL;
-		const char* keys_file_name = NULL;
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
-
-		char cert_path[MAX_CERT_PATH_LENGTH];
-
-		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
-		snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
-		free(keys_bundle_dir);
-		printf("TP: Loading key '%s'\n", cert_path);
-
-		pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
-		if (pub_cert == NULL){
-			printf("TP: Cannot load key '%s'\n", cert_path);
-			printf("TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
-			pubEP->is_secure = false;
-		}
-	}
-#endif
-
-	zsock_t* socket = zsock_new (ZMQ_PUB);
-	if(socket==NULL){
-		#ifdef BUILD_WITH_ZMQ_SECURITY
-			if (pubEP->is_secure){
-				zcert_destroy(&pub_cert);
-			}
-		#endif
-
-        perror("Error for zmq_socket");
-		return CELIX_SERVICE_EXCEPTION;
-	}
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (pubEP->is_secure){
-		zcert_apply (pub_cert, socket); // apply certificate to socket
-		zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
-	}
-#endif
-
-	int rv = -1, retry=0;
-	char* ep = malloc(EP_ADDRESS_LEN);
-    char bindAddress[EP_ADDRESS_LEN];
-
-	while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
-		/* Randomized part due to same bundle publishing on different topics */
-		unsigned int port = rand_range(basePort,maxPort);
-		memset(ep,0,EP_ADDRESS_LEN);
-        memset(bindAddress, 0, EP_ADDRESS_LEN);
-
-		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
-		rv = zsock_bind (socket, "%s", bindAddress);
-        if (rv == -1) {
-            perror("Error for zmq_bind");
-        }
-		retry++;
-	}
-
-	if(rv == -1){
-		free(ep);
-		return CELIX_SERVICE_EXCEPTION;
-	}
-
-	/* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
-
-	topic_publication_pt pub = calloc(1,sizeof(*pub));
-
-	arrayList_create(&(pub->pub_ep_list));
-	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
-	celixThreadMutex_create(&(pub->tp_lock),NULL);
-
-	pub->endpoint = ep;
-	pub->zmq_socket = socket;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (pubEP->is_secure){
-		pub->zmq_cert = pub_cert;
-	}
-#endif
-
-	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
-
-	*out = pub;
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-
-	free(pub->endpoint);
-	arrayList_destroy(pub->pub_ep_list);
-
-	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
-		pubsub_destroyPublishBundleBoundService(bound);
-	}
-	hashMapIterator_destroy(iter);
-	hashMap_destroy(pub->boundServices,false,false);
-
-	pub->svcFactoryReg = NULL;
-	zsock_destroy(&(pub->zmq_socket));
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	zcert_destroy(&(pub->zmq_cert));
-#endif
-
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	celixThreadMutex_destroy(&(pub->tp_lock));
-
-	free(pub);
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
-	celix_status_t status = CELIX_SUCCESS;
-
-	/* Let's register the new service */
-	//celixThreadMutex_lock(&(pub->tp_lock));
-
-	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
-
-	if(pubEP!=NULL){
-		service_factory_pt factory = calloc(1, sizeof(*factory));
-		factory->handle = pub;
-		factory->getService = pubsub_topicPublicationGetService;
-		factory->ungetService = pubsub_topicPublicationUngetService;
-
-		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
-		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
-
-		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
-
-		if(status != CELIX_SUCCESS){
-			properties_destroy(props);
-			printf("PSA: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
-		}
-		else{
-			*svcFactory = factory;
-		}
-	}
-	else{
-		printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
-		status = CELIX_SERVICE_EXCEPTION;
-	}
-
-	//celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-	celix_status_t status = CELIX_SUCCESS;
-
-	//celixThreadMutex_lock(&(pub->tp_lock));
-
-	status = serviceRegistration_unregister(pub->svcFactoryReg);
-
-	//celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-	ep->endpoint = strdup(pub->endpoint);
-	arrayList_add(pub->pub_ep_list,ep);
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-	for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
-	        pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
-	        if(pubsubEndpoint_equals(ep, e)) {
-	            arrayList_removeElement(pub->pub_ep_list,ep);
-	            break;
-	        }
-	}
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
-	return pub->pub_ep_list;
-}
-
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
-	celix_status_t  status = CELIX_SUCCESS;
-
-	topic_publication_pt publish = (topic_publication_pt)handle;
-
-	celixThreadMutex_lock(&(publish->tp_lock));
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound==NULL){
-		bound = pubsub_createPublishBundleBoundService(publish,bundle);
-		if(bound!=NULL){
-			hashMap_put(publish->boundServices,bundle,bound);
-		}
-	}
-	else{
-		bound->getCount++;
-	}
-
-	*service = bound->service;
-
-	celixThreadMutex_unlock(&(publish->tp_lock));
-
-	return status;
-}
-
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
-
-	topic_publication_pt publish = (topic_publication_pt)handle;
-
-	celixThreadMutex_lock(&(publish->tp_lock));
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound!=NULL){
-
-		bound->getCount--;
-		if(bound->getCount==0){
-			pubsub_destroyPublishBundleBoundService(bound);
-			hashMap_remove(publish->boundServices,bundle);
-		}
-
-	}
-	else{
-		long bundleId = -1;
-		bundle_getBundleId(bundle,&bundleId);
-		printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
-	}
-
-	/* service should be never used for unget, so let's set the pointer to NULL */
-	*service = NULL;
-
-	celixThreadMutex_unlock(&(publish->tp_lock));
-
-	return CELIX_SUCCESS;
-}
-
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
-
-	bool ret = true;
-
-	zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
-	if (headerMsg == NULL) ret=false;
-	zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
-	if (payloadMsg == NULL) ret=false;
-
-	delay_first_send_for_late_joiners();
-
-	if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
-
-	if(!last){
-		if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
-	}
-	else{
-		if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
-	}
-
-	if (!ret){
-		zframe_destroy(&headerMsg);
-		zframe_destroy(&payloadMsg);
-	}
-
-	free(msg->header);
-	free(msg->payload);
-	free(msg);
-
-	return ret;
-
-}
-
-static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
-
-	bool ret = true;
-
-	unsigned int i = 0;
-	unsigned int mp_num = arrayList_size(mp_msg_parts);
-	for(;i<mp_num;i++){
-		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
-	}
-	arrayList_clear(mp_msg_parts);
-
-	return ret;
-
-}
-
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, void *msg) {
-
-	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
-}
-
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, void *msg, int flags){
-
-	int status = 0;
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
-
-	celixThreadMutex_lock(&(bound->mp_lock));
-	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
-		printf("TP: Multipart send already in progress. Cannot process a new one.\n");
-		celixThreadMutex_unlock(&(bound->mp_lock));
-		return -3;
-	}
-
-	pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
-	int major=0, minor=0;
-
-	if (msgType != NULL) {
-
-		version_pt msgVersion = pubsubSerializer_getVersion(msgType);
-
-		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
-		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
-		msg_hdr->type = msgTypeId;
-		if (msgVersion != NULL){
-			version_getMajor(msgVersion, &major);
-			version_getMinor(msgVersion, &minor);
-			msg_hdr->major = major;
-			msg_hdr->minor = minor;
-		}
-
-		void* serializedOutput = NULL;
-		int serializedOutputLen = 0;
-		pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen);
-
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
-		msg->header = msg_hdr;
-		msg->payload = (char *) serializedOutput;
-		msg->payloadSize = serializedOutputLen;
-
-		bool snd = true;
-
-		switch(flags){
-		case PUBSUB_PUBLISHER_FIRST_MSG:
-			bound->mp_send_in_progress = true;
-			arrayList_add(bound->mp_parts,msg);
-			break;
-		case PUBSUB_PUBLISHER_PART_MSG:
-			if(!bound->mp_send_in_progress){
-				printf("TP: ERROR: received msg part without the first part.\n");
-				status = -4;
-			}
-			else{
-				arrayList_add(bound->mp_parts,msg);
-			}
-			break;
-		case PUBSUB_PUBLISHER_LAST_MSG:
-			if(!bound->mp_send_in_progress){
-				printf("TP: ERROR: received end msg without the first part.\n");
-				status = -4;
-			}
-			else{
-				arrayList_add(bound->mp_parts,msg);
-				celixThreadMutex_lock(&(bound->parent->tp_lock));
-				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
-				bound->mp_send_in_progress = false;
-				celixThreadMutex_unlock(&(bound->parent->tp_lock));
-			}
-			break;
-		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
-			celixThreadMutex_lock(&(bound->parent->tp_lock));
-			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-			celixThreadMutex_unlock(&(bound->parent->tp_lock));
-			break;
-		default:
-			printf("TP: ERROR: Invalid MP flags combination\n");
-			status = -4;
-			break;
-		}
-
-		if(!snd){
-			printf("TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
-		}
-
-	} else {
-		printf("TP: Message %u not supported.",msgTypeId);
-		status=-1;
-	}
-
-	celixThreadMutex_unlock(&(bound->mp_lock));
-
-	return status;
-
-}
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
-	return 0;
-}
-
-static unsigned int rand_range(unsigned int min, unsigned int max){
-
-	double scaled = (double)(((double)rand())/((double)RAND_MAX));
-	return (max-min+1)*scaled + min;
-
-}
-
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
-
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
-
-	if (bound != NULL) {
-		bound->service = calloc(1, sizeof(*bound->service));
-	}
-
-	if (bound != NULL && bound->service != NULL) {
-
-		bound->parent = tp;
-		bound->bundle = bundle;
-		bound->getCount = 1;
-		bound->mp_send_in_progress = false;
-		celixThreadMutex_create(&bound->mp_lock,NULL);
-		bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
-		arrayList_create(&bound->mp_parts);
-
-		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->topic=strdup(pubEP->topic);
-
-		bound->service->handle = bound;
-		bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
-		bound->service->send = pubsub_topicPublicationSend;
-		bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
-
-		pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle);
-
-	}
-	else
-	{
-		if (bound != NULL) {
-			free(bound->service);
-		}
-		free(bound);
-		return NULL;
-	}
-
-	return bound;
-}
-
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
-
-	celixThreadMutex_lock(&boundSvc->mp_lock);
-
-	if(boundSvc->service != NULL){
-		free(boundSvc->service);
-	}
-
-	if(boundSvc->msgTypes != NULL){
-		pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes);
-		hashMap_destroy(boundSvc->msgTypes,false,false);
-	}
-
-	if(boundSvc->mp_parts!=NULL){
-		arrayList_destroy(boundSvc->mp_parts);
-	}
-
-	if(boundSvc->topic!=NULL){
-		free(boundSvc->topic);
-	}
-
-	celixThreadMutex_unlock(&boundSvc->mp_lock);
-	celixThreadMutex_destroy(&boundSvc->mp_lock);
-
-	free(boundSvc);
-
-}
-
-static void delay_first_send_for_late_joiners(){
-
-	static bool firstSend = true;
-
-	if(firstSend){
-		printf("TP: Delaying first send for late joiners...\n");
-		sleep(FIRST_SEND_DELAY);
-		firstSend = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
deleted file mode 100644
index cb9aff5..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ /dev/null
@@ -1,777 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_subscription.c
- *
- *  \date       Oct 2, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "topic_subscription.h"
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include <string.h>
-#include <stdlib.h>
-#include <signal.h>
-
-#include "utils.h"
-#include "celix_errno.h"
-#include "constants.h"
-#include "version.h"
-
-#include "subscriber.h"
-#include "publisher.h"
-#include "dyn_msg_utils.h"
-#include "pubsub_utils.h"
-
-#include "pubsub_serializer.h"
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	#include "zmq_crypto.h"
-
-	#define MAX_CERT_PATH_LENGTH 512
-#endif
-
-#define POLL_TIMEOUT  	250
-#define ZMQ_POLL_TIMEOUT_MS_ENV 	"ZMQ_POLL_TIMEOUT_MS"
-
-struct topic_subscription{
-
-	zsock_t* zmq_socket;
-	zcert_t * zmq_cert;
-	zcert_t * zmq_pub_cert;
-	pthread_mutex_t socket_lock;
-	service_tracker_pt tracker;
-	array_list_pt sub_ep_list;
-	celix_thread_t recv_thread;
-	bool running;
-	celix_thread_mutex_t ts_lock;
-	bundle_context_pt context;
-
-	hash_map_pt servicesMap; // key = service, value = msg types map
-	array_list_pt pendingConnections;
-	array_list_pt pendingDisconnections;
-
-	celix_thread_mutex_t pendingConnections_lock;
-	celix_thread_mutex_t pendingDisconnections_lock;
-	unsigned int nrSubscribers;
-};
-
-typedef struct complete_zmq_msg{
-	zframe_t* header;
-	zframe_t* payload;
-}* complete_zmq_msg_pt;
-
-typedef struct mp_handle{
-	hash_map_pt svc_msg_db;
-	hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
-
-typedef struct msg_map_entry{
-	bool retain;
-	void* msgInst;
-}* msg_map_entry_pt;
-
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
-static void* zmq_recv_thread_func(void* arg);
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
-static void sigusr1_sighandler(int signo);
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
-static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
-static void connectPendingPublishers(topic_subscription_pt sub);
-static void disconnectPendingPublishers(topic_subscription_pt sub);
-
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, char* scope, char* topic,topic_subscription_pt* out){
-	celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-	if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC) != 0){
-		char* secure_topics = NULL;
-		bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
-
-		if (secure_topics){
-			array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
-
-			int i;
-			int secure_topics_size = arrayList_size(secure_topics_list);
-			for (i = 0; i < secure_topics_size; i++){
-				char* top = arrayList_get(secure_topics_list, i);
-				if (strcmp(topic, top) == 0){
-					printf("TS: Secure topic: '%s'\n", top);
-					subEP->is_secure = true;
-				}
-				free(top);
-				top = NULL;
-			}
-
-			arrayList_destroy(secure_topics_list);
-		}
-	}
-
-	zcert_t* sub_cert = NULL;
-	zcert_t* pub_cert = NULL;
-	const char* pub_key = NULL;
-	if (subEP->is_secure){
-		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-		if (keys_bundle_dir == NULL){
-			return CELIX_SERVICE_EXCEPTION;
-		}
-
-		const char* keys_file_path = NULL;
-		const char* keys_file_name = NULL;
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
-
-		char sub_cert_path[MAX_CERT_PATH_LENGTH];
-		char pub_cert_path[MAX_CERT_PATH_LENGTH];
-
-		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
-		snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
-		snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
-		free(keys_bundle_dir);
-
-		printf("TS: Loading subscriber key '%s'\n", sub_cert_path);
-		printf("TS: Loading publisher key '%s'\n", pub_cert_path);
-
-		sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path);
-		if (sub_cert == NULL){
-			printf("TS: Cannot load key '%s'\n", sub_cert_path);
-			printf("TS: Topic '%s' NOT SECURED !\n", topic);
-			subEP->is_secure = false;
-		}
-
-		pub_cert = zcert_load(pub_cert_path);
-		if (sub_cert != NULL && pub_cert == NULL){
-			zcert_destroy(&sub_cert);
-			printf("TS: Cannot load key '%s'\n", pub_cert_path);
-			printf("TS: Topic '%s' NOT SECURED !\n", topic);
-			subEP->is_secure = false;
-		}
-
-		pub_key = zcert_public_txt(pub_cert);
-	}
-#endif
-
-	zsock_t* zmq_s = zsock_new (ZMQ_SUB);
-	if(zmq_s==NULL){
-		#ifdef BUILD_WITH_ZMQ_SECURITY
-		if (subEP->is_secure){
-			zcert_destroy(&sub_cert);
-			zcert_destroy(&pub_cert);
-		}
-		#endif
-
-		return CELIX_SERVICE_EXCEPTION;
-	}
-
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (subEP->is_secure){
-		zcert_apply (sub_cert, zmq_s);
-		zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
-	}
-	#endif
-
-	if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
-		zsock_set_subscribe (zmq_s, "");
-	}
-	else{
-		zsock_set_subscribe (zmq_s, topic);
-	}
-
-	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
-	ts->context = bundle_context;
-	ts->zmq_socket = zmq_s;
-	ts->running = false;
-	ts->nrSubscribers = 0;
-
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (subEP->is_secure){
-		ts->zmq_cert = sub_cert;
-		ts->zmq_pub_cert = pub_cert;
-	}
-	#endif
-
-	celixThreadMutex_create(&ts->socket_lock, NULL);
-	celixThreadMutex_create(&ts->ts_lock,NULL);
-	arrayList_create(&ts->sub_ep_list);
-	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
-	arrayList_create(&ts->pendingConnections);
-	arrayList_create(&ts->pendingDisconnections);
-	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
-	celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
-
-	char filter[128];
-	memset(filter,0,128);
-	if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
-	        // default scope, means that subscriber has not defined a scope property
-	        snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-	                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-	                PUBSUB_SUBSCRIBER_TOPIC,topic);
-
-	} else {
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic,
-                PUBSUB_SUBSCRIBER_SCOPE,scope);
-    }
-	service_tracker_customizer_pt customizer = NULL;
-	status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
-	status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker);
-
-	struct sigaction actions;
-	memset(&actions, 0, sizeof(actions));
-	sigemptyset(&actions.sa_mask);
-	actions.sa_flags = 0;
-	actions.sa_handler = sigusr1_sighandler;
-
-	sigaction(SIGUSR1,&actions,NULL);
-
-	*out=ts;
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-
-	ts->running = false;
-	serviceTracker_destroy(ts->tracker);
-	arrayList_clear(ts->sub_ep_list);
-	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->servicesMap,false,false);
-
-	celixThreadMutex_lock(&ts->pendingConnections_lock);
-	arrayList_destroy(ts->pendingConnections);
-	celixThreadMutex_unlock(&ts->pendingConnections_lock);
-	celixThreadMutex_destroy(&ts->pendingConnections_lock);
-
-	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-	arrayList_destroy(ts->pendingDisconnections);
-	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
-
-	celixThreadMutex_lock(&ts->socket_lock);
-	zsock_destroy(&(ts->zmq_socket));
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	zcert_destroy(&(ts->zmq_cert));
-	zcert_destroy(&(ts->zmq_pub_cert));
-	#endif
-	celixThreadMutex_unlock(&ts->socket_lock);
-	celixThreadMutex_destroy(&ts->socket_lock);
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-
-	free(ts);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
-	celix_status_t status = CELIX_SUCCESS;
-
-	//celixThreadMutex_lock(&ts->ts_lock);
-
-	status = serviceTracker_open(ts->tracker);
-
-	ts->running = true;
-
-	if(status==CELIX_SUCCESS){
-		status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
-	}
-
-	//celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
-	celix_status_t status = CELIX_SUCCESS;
-
-	//celixThreadMutex_lock(&ts->ts_lock);
-
-	ts->running = false;
-
-	pthread_kill(ts->recv_thread.thread,SIGUSR1);
-
-	celixThread_join(ts->recv_thread,NULL);
-
-	status = serviceTracker_close(ts->tracker);
-
-	//celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
-	celix_status_t status = CELIX_SUCCESS;
-	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){
-		status = CELIX_SERVICE_EXCEPTION;
-	}
-	celixThreadMutex_unlock(&ts->socket_lock);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
-    celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingConnections_lock);
-    arrayList_add(ts->pendingConnections, url);
-    celixThreadMutex_unlock(&ts->pendingConnections_lock);
-    return status;
-}
-
-celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
-     celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-    arrayList_add(ts->pendingDisconnections, url);
-    celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-    return status;
-}
-
-celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){
-		status = CELIX_SERVICE_EXCEPTION;
-	}
-	celixThreadMutex_unlock(&ts->socket_lock);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-
-	arrayList_add(ts->sub_ep_list,subEP);
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-
-}
-
-celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-
-	ts->nrSubscribers++;
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
-
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-
-	arrayList_removeElement(ts->sub_ep_list,subEP);
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-
-}
-
-celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-
-	ts->nrSubscribers--;
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
-
-unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
-	return ts->nrSubscribers;
-}
-
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
-	celix_status_t status = CELIX_SUCCESS;
-	topic_subscription_pt ts = handle;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
-		bundle_pt bundle = NULL;
-		serviceReference_getBundle(reference, &bundle);
-		pubsubSerializer_fillMsgTypesMap(msgTypes,bundle);
-
-		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
-			hashMap_destroy(msgTypes,false,false);
-			printf("TS: Unsupported subscriber!\n");
-		}
-		else{
-			hashMap_put(ts->servicesMap, service, msgTypes);
-		}
-
-	}
-	celixThreadMutex_unlock(&ts->ts_lock);
-	printf("TS: New subscriber registered.\n");
-	return status;
-
-}
-
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
-	celix_status_t status = CELIX_SUCCESS;
-	topic_subscription_pt ts = handle;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL){
-			pubsubSerializer_emptyMsgTypesMap(msgTypes);
-			hashMap_destroy(msgTypes,false,false);
-		}
-	}
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	printf("TS: Subscriber unregistered.\n");
-	return status;
-}
-
-
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
-
-	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
-
-	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-	while (hashMapIterator_hasNext(iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
-
-		pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type));
-		if (msgType == NULL) {
-			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
-		}
-		else{
-			void *msgInst = NULL;
-			char *name = pubsubSerializer_getName(msgType);
-			version_pt msgVersion = pubsubSerializer_getVersion(msgType);
-
-			bool validVersion = checkVersion(msgVersion,first_msg_hdr);
-
-			if(validVersion){
-
-				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
-
-				if (status == CELIX_SUCCESS) {
-					bool release = true;
-
-					mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
-					pubsub_multipart_callbacks_t mp_callbacks;
-					mp_callbacks.handle = mp_handle;
-					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
-					mp_callbacks.getMultipart = pubsub_getMultipart;
-					subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
-
-					if(release){
-						pubsubSerializer_freeMsg(msgType, msgInst);
-					}
-					if(mp_handle!=NULL){
-						destroy_mp_handle(mp_handle);
-					}
-				}
-				else{
-					printf("TS: Cannot deserialize msgType %s.\n",name);
-				}
-
-			}
-			else{
-				int major=0,minor=0;
-				version_getMajor(msgVersion,&major);
-				version_getMinor(msgVersion,&minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
-			}
-
-		}
-	}
-	hashMapIterator_destroy(iter);
-
-	int i = 0;
-	for(;i<arrayList_size(msg_list);i++){
-		complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i);
-		zframe_destroy(&(c_msg->header));
-		zframe_destroy(&(c_msg->payload));
-		free(c_msg);
-	}
-
-	arrayList_destroy(msg_list);
-
-}
-
-static void* zmq_recv_thread_func(void * arg) {
-    topic_subscription_pt sub = (topic_subscription_pt) arg;
-
-    while (sub->running) {
-
-        celixThreadMutex_lock(&sub->socket_lock);
-
-        zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
-        if (headerMsg == NULL) {
-            if (errno == EINTR) {
-                //It means we got a signal and we have to exit...
-                printf("TS: header_recv thread for topic got a signal and will exit.\n");
-            } else {
-                perror("TS: header_recv thread");
-            }
-        } else {
-
-			pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg);
-
-			if (zframe_more(headerMsg)) {
-
-				zframe_t* payloadMsg = zframe_recv(sub->zmq_socket);
-				if (payloadMsg == NULL) {
-					if (errno == EINTR) {
-						//It means we got a signal and we have to exit...
-						printf("TS: payload_recv thread for topic got a signal and will exit.\n");
-					} else {
-						perror("TS: payload_recv");
-					}
-					zframe_destroy(&headerMsg);
-				} else {
-
-					celixThreadMutex_lock(&sub->ts_lock);
-
-					//Let's fetch all the messages from the socket
-					array_list_pt msg_list = NULL;
-					arrayList_create(&msg_list);
-					complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg));
-					firstMsg->header = headerMsg;
-					firstMsg->payload = payloadMsg;
-					arrayList_add(msg_list, firstMsg);
-
-					bool more = zframe_more(payloadMsg);
-					while (more) {
-
-						zframe_t* h_msg = zframe_recv(sub->zmq_socket);
-						if (h_msg == NULL) {
-							if (errno == EINTR) {
-								//It means we got a signal and we have to exit...
-								printf("TS: h_recv thread for topic got a signal and will exit.\n");
-							} else {
-								perror("TS: h_recv");
-							}
-							break;
-						}
-
-						zframe_t* p_msg = zframe_recv(sub->zmq_socket);
-						if (p_msg == NULL) {
-							if (errno == EINTR) {
-								//It means we got a signal and we have to exit...
-								printf("TS: p_recv thread for topic got a signal and will exit.\n");
-							} else {
-								perror("TS: p_recv");
-							}
-							zframe_destroy(&h_msg);
-							break;
-						}
-
-						complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg));
-						c_msg->header = h_msg;
-						c_msg->payload = p_msg;
-						arrayList_add(msg_list, c_msg);
-
-						if (!zframe_more(p_msg)) {
-							more = false;
-						}
-					}
-
-					process_msg(sub, msg_list);
-
-					celixThreadMutex_unlock(&sub->ts_lock);
-
-				}
-
-			} //zframe_more(headerMsg)
-			else {
-				free(headerMsg);
-				printf("TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic);
-			}
-
-        } // headerMsg != NULL
-        celixThreadMutex_unlock(&sub->socket_lock);
-        connectPendingPublishers(sub);
-        disconnectPendingPublishers(sub);
-    } // while
-
-    return NULL;
-}
-
-static void connectPendingPublishers(topic_subscription_pt sub) {
-	celixThreadMutex_lock(&sub->pendingConnections_lock);
-	while(!arrayList_isEmpty(sub->pendingConnections)) {
-		char * pubEP = arrayList_remove(sub->pendingConnections, 0);
-		pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
-		free(pubEP);
-	}
-	celixThreadMutex_unlock(&sub->pendingConnections_lock);
-}
-
-static void disconnectPendingPublishers(topic_subscription_pt sub) {
-	celixThreadMutex_lock(&sub->pendingDisconnections_lock);
-	while(!arrayList_isEmpty(sub->pendingDisconnections)) {
-		char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
-		pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
-		free(pubEP);
-	}
-	celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
-}
-
-static void sigusr1_sighandler(int signo){
-	printf("TS: Topic subscription being shut down...\n");
-	return;
-}
-
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
-	bool check=false;
-	int major=0,minor=0;
-
-	if(msgVersion!=NULL){
-		version_getMajor(msgVersion,&major);
-		version_getMinor(msgVersion,&minor);
-		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
-			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
-		}
-	}
-
-	return check;
-}
-
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
-	return 0;
-}
-
-static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){
-
-	if(handle==NULL){
-		*part = NULL;
-		return -1;
-	}
-
-	mp_handle_pt mp_handle = (mp_handle_pt)handle;
-	msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
-	if(entry!=NULL){
-		entry->retain = retain;
-		*part = entry->msgInst;
-	}
-	else{
-		printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId);
-		*part=NULL;
-		return -2;
-	}
-
-	return 0;
-
-}
-
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
-
-	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
-		return NULL;
-	}
-
-	mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
-	mp_handle->svc_msg_db = svc_msg_db;
-	mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL);
-
-	int i=1; //We skip the first message, it will be handle differently
-	for(;i<arrayList_size(rcv_msg_list);i++){
-		complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
-
-		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
-
-		pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type));
-		if (msgType != NULL) {
-			void *msgInst = NULL;
-			version_pt msgVersion = pubsubSerializer_getVersion(msgType);
-
-			bool validVersion = checkVersion(msgVersion,header);
-
-			if(validVersion){
-				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
-
-				if(status == CELIX_SUCCESS){
-					unsigned int* msgId = calloc(1,sizeof(unsigned int));
-					*msgId = header->type;
-					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
-					entry->msgInst = msgInst;
-					hashMap_put(mp_handle->rcv_msg_map,msgId,entry);
-				}
-			}
-		}
-
-	}
-
-	return mp_handle;
-
-}
-
-static void destroy_mp_handle(mp_handle_pt mp_handle){
-
-	hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
-	while(hashMapIterator_hasNext(iter)){
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry);
-		msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-		pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId);
-		if(msgType!=NULL){
-			if(!msgEntry->retain){
-				free(msgEntry->msgInst);
-			}
-		}
-		else{
-			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId);
-		}
-	}
-	hashMapIterator_destroy(iter);
-
-	hashMap_destroy(mp_handle->rcv_msg_map,true,true);
-	free(mp_handle);
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
deleted file mode 100644
index fe444bd..0000000
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * zmq_crypto.c
- *
- *  \date       Dec 2, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "zmq_crypto.h"
-
-#include <zmq.h>
-#include <openssl/conf.h>
-#include <openssl/evp.h>
-#include <openssl/err.h>
-
-#include <string.h>
-
-#define MAX_FILE_PATH_LENGTH 512
-#define ZMQ_KEY_LENGTH 40
-#define AES_KEY_LENGTH 32
-#define AES_IV_LENGTH 16
-
-#define KEY_TO_GET "aes_key"
-#define IV_TO_GET "aes_iv"
-
-static char* read_file_content(char* filePath, char* fileName);
-static void parse_key_lines(char *keysBuffer, char **key, char **iv);
-static void parse_key_line(char *line, char **key, char **iv);
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
-
-/**
- * Return a valid zcert_t from an encoded file
- * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
- */
-zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
-{
-
-	if (keysFilePath == NULL){
-		keysFilePath = DEFAULT_KEYS_FILE_PATH;
-	}
-
-	if (keysFileName == NULL){
-		keysFileName = DEFAULT_KEYS_FILE_NAME;
-	}
-
-	char* keys_data = read_file_content(keysFilePath, keysFileName);
-	if (keys_data == NULL){
-		return NULL;
-	}
-
-	char *key = NULL;
-	char *iv = NULL;
-	parse_key_lines(keys_data, &key, &iv);
-	free(keys_data);
-
-	if (key == NULL || iv == NULL){
-		free(key);
-		free(iv);
-
-		printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
-		return NULL;
-	}
-
-	//At this point, we know an aes key and iv are stored and loaded
-
-	// generate sha256 hashes
-	unsigned char key_digest[EVP_MAX_MD_SIZE];
-	unsigned char iv_digest[EVP_MAX_MD_SIZE];
-	generate_sha256_hash((char*) key, key_digest);
-	generate_sha256_hash((char*) iv, iv_digest);
-
-	zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
-	if (encoded_secret == NULL){
-		free(key);
-		free(iv);
-
-		return NULL;
-	}
-
-	int encoded_secret_size = (int) zchunk_size (encoded_secret);
-	char* encoded_secret_data = zchunk_strdup(encoded_secret);
-	zchunk_destroy (&encoded_secret);
-
-	// Decryption of data
-	int decryptedtext_len;
-	unsigned char decryptedtext[encoded_secret_size];
-	decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
-	decryptedtext[decryptedtext_len] = '\0';
-
-	EVP_cleanup();
-
-	free(encoded_secret_data);
-	free(key);
-	free(iv);
-
-	// The public and private keys are retrieved
-	char* public_text = NULL;
-	char* secret_text = NULL;
-
-	extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
-
-	byte public_key [32] = { 0 };
-	byte secret_key [32] = { 0 };
-
-	zmq_z85_decode (public_key, public_text);
-	zmq_z85_decode (secret_key, secret_text);
-
-	zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
-
-	free(public_text);
-	free(secret_text);
-
-	return cert_loaded;
-}
-
-int generate_sha256_hash(char* text, unsigned char* digest)
-{
-	unsigned int digest_len;
-
-	EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
-	EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
-	EVP_DigestUpdate(mdctx, text, strlen(text));
-	EVP_DigestFinal_ex(mdctx, digest, &digest_len);
-	EVP_MD_CTX_free(mdctx);
-
-	return digest_len;
-}
-
-int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
-{
-	int len;
-	int plaintext_len;
-
-	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
-
-	EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
-	EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
-	plaintext_len = len;
-	EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
-	plaintext_len += len;
-
-	EVP_CIPHER_CTX_free(ctx);
-
-	return plaintext_len;
-}
-
-/**
- * Caller is responsible for freeing the returned value
- */
-static char* read_file_content(char* filePath, char* fileName){
-
-	char fileNameWithPath[MAX_FILE_PATH_LENGTH];
-	snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
-	int rc = 0;
-
-	if (!zsys_file_exists(fileNameWithPath)){
-		printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	zfile_t* keys_file = zfile_new (filePath, fileName);
-	rc = zfile_input (keys_file);
-	if (rc != 0){
-		zfile_destroy(&keys_file);
-		printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
-	zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
-	if (keys_chunk == NULL){
-		zfile_close(keys_file);
-		zfile_destroy(&keys_file);
-		printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
-		return NULL;
-	}
-
-	char* keys_data = zchunk_strdup(keys_chunk);
-	zchunk_destroy(&keys_chunk);
-	zfile_close(keys_file);
-	zfile_destroy (&keys_file);
-
-	return keys_data;
-}
-
-static void parse_key_lines(char *keysBuffer, char **key, char **iv){
-	char *line = NULL, *saveLinePointer = NULL;
-
-	bool firstTime = true;
-	do {
-		if (firstTime){
-			line = strtok_r(keysBuffer, "\n", &saveLinePointer);
-			firstTime = false;
-		}else {
-			line = strtok_r(NULL, "\n", &saveLinePointer);
-		}
-
-		if (line == NULL){
-			break;
-		}
-
-		parse_key_line(line, key, iv);
-
-	} while((*key == NULL || *iv == NULL) && line != NULL);
-
-}
-
-static void parse_key_line(char *line, char **key, char **iv){
-	char *detectedKey = NULL, *detectedValue= NULL;
-
-	char* sep_at = strchr(line, ':');
-	if (sep_at == NULL){
-		return;
-	}
-
-	*sep_at = '\0'; // overwrite first separator, creating two strings.
-	detectedKey = line;
-	detectedValue = sep_at + 1;
-
-	if (detectedKey == NULL || detectedValue == NULL){
-		return;
-	}
-	if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
-		return;
-	}
-
-	if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
-		*key = strndup(detectedValue, AES_KEY_LENGTH);
-	} else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
-		*iv = strndup(detectedValue, AES_IV_LENGTH);
-	}
-}
-
-static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
-	// Load decrypted text buffer
-	zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
-	if (secret_decrypted == NULL){
-		printf("CRYPTO: Failed to create zchunk\n");
-		return;
-	}
-
-	zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
-	zchunk_destroy (&secret_decrypted);
-	if (secret_config == NULL){
-		printf("CRYPTO: Failed to create zconfig\n");
-		return;
-	}
-
-	// Extract public and secret key from text buffer
-	char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
-	char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
-
-	if (public_text == NULL || secret_text == NULL){
-		zconfig_destroy(&secret_config);
-		printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
-		return;
-	}
-
-	*publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
-	*secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
-
-	zconfig_destroy(&secret_config);
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h
deleted file mode 100644
index 71085ab..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.h
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef DYN_MSG_UTILS_H_
-#define DYN_MSG_UTILS_H_
-
-#include "bundle.h"
-#include "hash_map.h"
-
-unsigned int uintHash(const void * uintNum);
-int uintEquals(const void * uintNum, const void * toCompare);
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void emptyMsgTypesMap(hash_map_pt msgTypesMap);
-
-#endif /* DYN_MSG_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h b/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
deleted file mode 100644
index bd39fc0..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
-#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
-
-#include "pubsub_endpoint.h"
-
-struct publisher_endpoint_announce {
-	void *handle;
-	celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP);
-	celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP);
-	celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic);
-	celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic);
-};
-
-typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
-
-
-#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
deleted file mode 100644
index fc1cfbb..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin.h
- *
- *  \date       Sep 30, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_H_
-#define PUBSUB_ADMIN_H_
-
-#include "service_reference.h"
-
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
-
-#define PSA_IP 	"PSA_IP"
-#define PSA_ITF	"PSA_INTERFACE"
-#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
-
-#define PSA_DEFAULT "zmq"
-
-typedef struct pubsub_admin *pubsub_admin_pt;
-
-struct pubsub_admin_service {
-	pubsub_admin_pt admin;
-
-	celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-	celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-
-	celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-	celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
-
-	celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
-	celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
-
-	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
-	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-};
-
-typedef struct pubsub_admin_service *pubsub_admin_service_pt;
-
-#endif /* PUBSUB_ADMIN_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h
deleted file mode 100644
index d9c6f1d..0000000
--- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_common.h
- *
- *  \date       Sep 17, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_COMMON_H_
-#define PUBSUB_COMMON_H_
-
-#define PUBSUB_ADMIN_SERVICE 			"pubsub_admin"
-#define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
-#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
-
-#define PUBSUB_ANY_SUB_TOPIC			"any"
-
-#define PUBSUB_BUNDLE_ID				"bundle.id"
-
-#define MAX_SCOPE_LEN					1024
-#define MAX_TOPIC_LEN					1024
-
-struct pubsub_msg_header{
-	char topic[MAX_TOPIC_LEN];
-	unsigned int type;
-	unsigned char major;
-	unsigned char minor;
-};
-
-typedef struct pubsub_msg_header* pubsub_msg_header_pt;
-
-
-#endif /* PUBSUB_COMMON_H_ */


Mime
View raw message