celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [31/54] [abbrv] celix git commit: Merge commit 'ee29b00d7a80af43d351e61916d5a5aa90f97e46' into feature/CELIX-417-cmake-refactor
Date Tue, 30 Jan 2018 19:30:15 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/framework/src/celix_launcher.c
----------------------------------------------------------------------
diff --cc framework/src/celix_launcher.c
index ba83f25,0000000..fe5d0c0
mode 100644,000000..100644
--- a/framework/src/celix_launcher.c
+++ b/framework/src/celix_launcher.c
@@@ -1,242 -1,0 +1,315 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * celix_launcher.c
 + *
 + *  \date       Mar 23, 2010
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#include "celix_launcher.h"
 +
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <libgen.h>
 +#include <signal.h>
 +
 +#ifndef CELIX_NO_CURLINIT
 +#include <curl/curl.h>
 +#endif
 +
 +#include <string.h>
 +#include <curl/curl.h>
 +#include <signal.h>
 +#include <libgen.h>
 +#include "celix_launcher.h"
 +#include "framework.h"
 +#include "linked_list_iterator.h"
 +
 +static void show_usage(char* prog_name);
 +static void shutdown_framework(int signal);
 +static void ignore(int signal);
 +
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework,
properties_pt packedConfig);
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework,
properties_pt packedConfig);
++
 +#define DEFAULT_CONFIG_FILE "config.properties"
 +
 +static framework_pt framework = NULL;
 +
++/**
++ * Method kept because of usage in examples & unit tests
++ */
 +int celixLauncher_launchWithArgs(int argc, char *argv[]) {
++	return celixLauncher_launchWithArgsAndProps(argc, argv, NULL);
++}
++
++int celixLauncher_launchWithArgsAndProps(int argc, char *argv[], properties_pt packedConfig)
{
 +	// Perform some minimal command-line option parsing...
 +	char *opt = NULL;
 +	if (argc > 1) {
 +		opt = argv[1];
 +	}
 +
 +	char *config_file = NULL;
 +
 +	if (opt) {
 +		// Check whether the user wants some help...
 +		if (strcmp("-h", opt) == 0 || strcmp("-help", opt) == 0) {
 +			show_usage(argv[0]);
 +			return 0;
 +		} else {
 +			config_file = opt;
 +		}
 +	} else {
 +		config_file = DEFAULT_CONFIG_FILE;
 +	}
 +
 +	struct sigaction sigact;
 +	memset(&sigact, 0, sizeof(sigact));
 +	sigact.sa_handler = shutdown_framework;
 +	sigaction(SIGINT,  &sigact, NULL);
 +	sigaction(SIGTERM, &sigact, NULL);
 +
 +	memset(&sigact, 0, sizeof(sigact));
 +	sigact.sa_handler = ignore;
 +	sigaction(SIGUSR1,  &sigact, NULL);
 +	sigaction(SIGUSR2,  &sigact, NULL);
 +
- 	int rc = celixLauncher_launch(config_file, &framework);
++	int rc = celixLauncher_launchWithConfigAndProps(config_file, &framework, packedConfig);
 +	if (rc == 0) {
 +		celixLauncher_waitForShutdown(framework);
 +		celixLauncher_destroy(framework);
 +	}
 +	return rc;
 +}
 +
 +static void show_usage(char* prog_name) {
 +	printf("Usage:\n  %s [path/to/config.properties]\n\n", basename(prog_name));
 +}
 +
 +static void shutdown_framework(int signal) {
 +	if (framework != NULL) {
 +		celixLauncher_stop(framework); //NOTE main thread will destroy
 +	}
 +}
 +
 +static void ignore(int signal) {
 +	//ignoring for signal SIGUSR1, SIGUSR2. Can be used on threads
 +}
 +
 +int celixLauncher_launch(const char *configFile, framework_pt *framework) {
++	return celixLauncher_launchWithConfigAndProps(configFile, framework, NULL);
++}
++
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, framework_pt *framework,
properties_pt packedConfig){
 +	int status = 0;
 +	FILE *config = fopen(configFile, "r");
- 	if (config != NULL) {
++
++	if (config != NULL && packedConfig != NULL) {
++		status = celixLauncher_launchWithStreamAndProps(config, framework, packedConfig);
++	} else if (config != NULL) {
 +		status = celixLauncher_launchWithStream(config, framework);
++	} else if (packedConfig != NULL) {
++		status = celixLauncher_launchWithProperties(packedConfig, framework);
 +	} else {
 +		fprintf(stderr, "Error: invalid or non-existing configuration file: '%s'.", configFile);
 +		perror("");
 +		status = 1;
 +	}
++
 +	return status;
 +}
 +
 +int celixLauncher_launchWithStream(FILE *stream, framework_pt *framework) {
 +	int status = 0;
 +
 +	properties_pt config = properties_loadWithStream(stream);
 +	fclose(stream);
 +	// Make sure we've read it and that nothing went wrong with the file access...
 +	if (config == NULL) {
 +		fprintf(stderr, "Error: invalid configuration file");
 +		perror(NULL);
 +		status = 1;
 +	}
 +	else {
 +		status = celixLauncher_launchWithProperties(config, framework);
 +	}
 +
 +	return status;
 +}
 +
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt *framework,
properties_pt packedConfig){
++	int status = 0;
++
++	properties_pt runtimeConfig = properties_loadWithStream(stream);
++	fclose(stream);
++
++	// Make sure we've read it and that nothing went wrong with the file access...
++	// If there is no runtimeConfig, the packedConfig can be stored as global config
++	if (runtimeConfig == NULL){
++		runtimeConfig = packedConfig;
++	}
++
++	if (runtimeConfig == NULL) {
++		fprintf(stderr, "Error: invalid configuration file");
++		perror(NULL);
++		status = 1;
++	} else {
++		// Check if there's a pre-compiled config available
++		if (packedConfig != NULL){
++			// runtimeConfig and packedConfig must be merged
++			// when a duplicate of a key is available, the runtimeConfig must be prioritized
++
++			hash_map_iterator_t iter = hashMapIterator_construct(packedConfig);
++
++			hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
++
++			while (entry != NULL) {
++				const char * key = (const char *) hashMapEntry_getKey(entry);
++				const char * value = (const char *) hashMapEntry_getValue(entry);
++
++				// Check existence of key in runtimeConfig
++				if (!hashMap_containsKey(runtimeConfig, key)) {
++					properties_set(runtimeConfig, key, value);
++				}
++
++				entry = hashMapIterator_nextEntry(&iter);
++				if (entry != NULL) {
++					key = (const char *) hashMapEntry_getKey(entry);
++					value = (const char *) hashMapEntry_getValue(entry);
++				}
++			}
++
++			// normally, the framework_destroy will clean up the properties_pt
++			// since there are 2 properties_pt available (runtimeConfig and packedConfig)
++			// the packedConfig must be destroyed
++			properties_destroy(packedConfig);
++		}
++
++		status = celixLauncher_launchWithProperties(runtimeConfig, framework);
++	}
++
++	return status;
++}
 +
 +int celixLauncher_launchWithProperties(properties_pt config, framework_pt *framework) {
 +	celix_status_t status;
 +#ifndef CELIX_NO_CURLINIT
 +	// Before doing anything else, let's setup Curl
 +	curl_global_init(CURL_GLOBAL_NOTHING);
 +#endif
 +
 +	const char* autoStartProp = properties_get(config, "cosgi.auto.start.1");
 +	char* autoStart = NULL;
 +	if (autoStartProp != NULL) {
 +		autoStart = strndup(autoStartProp, 1024*10);
 +	}
 +
 +	status = framework_create(framework, config);
 +	bundle_pt fwBundle = NULL;
 +	if (status == CELIX_SUCCESS) {
 +		status = fw_init(*framework);
 +		if (status == CELIX_SUCCESS) {
 +			// Start the system bundle
 +			status = framework_getFrameworkBundle(*framework, &fwBundle);
 +
 +			if(status == CELIX_SUCCESS){
 +				bundle_start(fwBundle);
 +
 +				char delims[] = " ";
 +				char *result = NULL;
 +				char *save_ptr = NULL;
 +				linked_list_pt bundles;
 +				array_list_pt installed = NULL;
 +				bundle_context_pt context = NULL;
 +				linked_list_iterator_pt iter = NULL;
 +				unsigned int i;
 +
 +				linkedList_create(&bundles);
 +				result = strtok_r(autoStart, delims, &save_ptr);
 +				while (result != NULL) {
 +					char *location = strdup(result);
 +					linkedList_addElement(bundles, location);
 +					result = strtok_r(NULL, delims, &save_ptr);
 +				}
 +				// First install all bundles
 +				// Afterwards start them
 +				arrayList_create(&installed);
 +				bundle_getContext(fwBundle, &context);
 +				iter = linkedListIterator_create(bundles, 0);
 +				while (linkedListIterator_hasNext(iter)) {
 +					bundle_pt current = NULL;
 +					char *location = (char *) linkedListIterator_next(iter);
 +					if (bundleContext_installBundle(context, location, &current) == CELIX_SUCCESS)
{
 +						// Only add bundle if it is installed correctly
 +						arrayList_add(installed, current);
 +					} else {
 +						printf("Could not install bundle from %s\n", location);
 +					}
 +					linkedListIterator_remove(iter);
 +					free(location);
 +				}
 +				linkedListIterator_destroy(iter);
 +				linkedList_destroy(bundles);
 +
 +				for (i = 0; i < arrayList_size(installed); i++) {
 +					bundle_pt installedBundle = (bundle_pt) arrayList_get(installed, i);
 +					bundle_startWithOptions(installedBundle, 0);
 +				}
 +
 +				arrayList_destroy(installed);
 +			}
 +		}
 +	}
 +
 +	if (status != CELIX_SUCCESS) {
 +		printf("Problem creating framework\n");
 +	}
 +
 +	printf("Launcher: Framework Started\n");
 +
 +	free(autoStart);
 +	
 +	return status;
 +}
 +
 +void celixLauncher_waitForShutdown(framework_pt framework) {
 +	framework_waitForStop(framework);
 +}
 +
 +void celixLauncher_destroy(framework_pt framework) {
 +	framework_destroy(framework);
 +
 +#ifndef CELIX_NO_CURLINIT
 +	// Cleanup Curl
 +	curl_global_cleanup();
 +#endif
 +
 +	printf("Launcher: Exit\n");
 +}
 +
 +void celixLauncher_stop(framework_pt framework) {
 +	bundle_pt fwBundle = NULL;
 +	if( framework_getFrameworkBundle(framework, &fwBundle) == CELIX_SUCCESS){
 +		bundle_stop(fwBundle);
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index e43ec29,0000000..44106df
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@@ -1,444 -1,0 +1,437 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
-  *  htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
++ *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
- /*
-  * topic_publication.c
-  *
-  *  \date       Sep 24, 2015
-  *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
-  *  \copyright	Apache License, Version 2.0
-  */
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +#include <errno.h>
 +
 +#include <sys/types.h>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "topic_publication.h"
 +#include "pubsub_common.h"
 +#include "publisher.h"
 +#include "large_udp.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#define EP_ADDRESS_LEN		32
 +
 +#define FIRST_SEND_DELAY	2
 +
 +struct topic_publication {
 +	int sendSocket;
 +	char* endpoint;
 +	service_registration_pt svcFactoryReg;
 +	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +	hash_map_pt boundServices; //<bundle_pt,bound_service>
 +	celix_thread_mutex_t tp_lock;
 +	pubsub_serializer_service_t *serializer;
 +	struct sockaddr_in destAddr;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +	topic_publication_pt parent;
 +	pubsub_publisher_t service;
 +	bundle_pt bundle;
 +	char *scope;
 +	char *topic;
 +	hash_map_pt msgTypes;
 +	unsigned short getCount;
 +	celix_thread_mutex_t mp_lock;
 +	largeUdp_pt largeUdpHandle;
 +}* publish_bundle_bound_service_pt;
 +
 +
 +typedef struct pubsub_msg{
 +	pubsub_msg_header_pt header;
 +	char* payload;
 +	unsigned int payloadSize;
 +} pubsub_msg_t;
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle,
service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle,
service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt
tp,bundle_pt bundle);
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int*
msgTypeId);
 +
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +
 +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t
*best_serializer, char* bindIP, topic_publication_pt *out){
 +
 +	char* ep = malloc(EP_ADDRESS_LEN);
 +	memset(ep,0,EP_ADDRESS_LEN);
 +	unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3,
UDP_MAX_PORT);
 +	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 +
 +
 +	topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +	arrayList_create(&(pub->pub_ep_list));
 +	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +	celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +	pub->endpoint = ep;
 +	pub->sendSocket = sendSocket;
 +	pub->destAddr.sin_family = AF_INET;
 +	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 +	pub->destAddr.sin_port = htons(port);
 +
 +	pub->serializer = best_serializer;
 +
 +	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +	*out = pub;
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +
 +	free(pub->endpoint);
 +	arrayList_destroy(pub->pub_ep_list);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +	while(hashMapIterator_hasNext(iter)){
 +		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
 +		pubsub_destroyPublishBundleBoundService(bound);
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(pub->boundServices,false,false);
 +
 +	pub->svcFactoryReg = NULL;
 +	pub->serializer = NULL;
 +
 +	if(close(pub->sendSocket) != 0){
 +		status = CELIX_FILE_IO_EXCEPTION;
 +	}
 +
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +	free(pub);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt
pub,service_factory_pt* svcFactory){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	/* Let's register the new service */
 +
 +	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +	if(pubEP!=NULL){
 +		service_factory_pt factory = calloc(1, sizeof(*factory));
 +		factory->handle = pub;
 +		factory->getService = pubsub_topicPublicationGetService;
 +		factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +		properties_pt props = properties_create();
 +		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
 +		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
 +
 +		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +		if(status != CELIX_SUCCESS){
 +			properties_destroy(props);
 +			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic
%s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
 +		}
 +		else{
 +			*svcFactory = factory;
 +		}
 +	}
 +	else{
 +		printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should
never happen!\n");
 +		status = CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +	return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt
ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	ep->endpoint = strdup(pub->endpoint);
 +	arrayList_add(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt
ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	arrayList_removeElement(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 +	array_list_pt list = NULL;
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	list = arrayList_clone(pub->pub_ep_list);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +	return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle,
service_registration_pt registration, void **service) {
 +	celix_status_t  status = CELIX_SUCCESS;
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound==NULL){
 +		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +		if(bound!=NULL){
 +			hashMap_put(publish->boundServices,bundle,bound);
 +		}
 +	}
 +	else{
 +		bound->getCount++;
 +	}
 +
 +	if (bound != NULL) {
 +		*service = &bound->service;
 +	}
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle,
service_registration_pt registration, void **service)  {
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound!=NULL){
 +
 +		bound->getCount--;
 +		if(bound->getCount==0){
 +			pubsub_destroyPublishBundleBoundService(bound);
 +			hashMap_remove(publish->boundServices,bundle);
 +		}
 +
 +	}
 +	else{
 +		long bundleId = -1;
 +		bundle_getBundleId(bundle,&bundleId);
 +		printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
 +	}
 +
 +	/* service should be never used for unget, so let's set the pointer to NULL */
 +	*service = NULL;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool
last, pubsub_release_callback_t *releaseCallback){
 +	const int iovec_len = 3; // header + size + payload
 +	bool ret = true;
 +
 +	struct iovec msg_iovec[iovec_len];
 +	msg_iovec[0].iov_base = msg->header;
 +	msg_iovec[0].iov_len = sizeof(*msg->header);
 +	msg_iovec[1].iov_base = &msg->payloadSize;
 +	msg_iovec[1].iov_len = sizeof(msg->payloadSize);
 +	msg_iovec[2].iov_base = msg->payload;
 +	msg_iovec[2].iov_len = msg->payloadSize;
 +
 +	delay_first_send_for_late_joiners();
 +
 +	if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec,
iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) ==
-1) {
 +		perror("send_pubsub_msg:sendSocket");
 +		ret = false;
 +	}
 +
 +	if(releaseCallback) {
 +		releaseCallback->release(msg->payload, bound);
 +	}
 +	return ret;
 +
 +}
 +
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void
*inMsg) {
 +	int status = 0;
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 +
 +	celixThreadMutex_lock(&(bound->parent->tp_lock));
 +	celixThreadMutex_lock(&(bound->mp_lock));
 +
- 	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes,
(void*)(uintptr_t)msgTypeId);
++	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes,
(void*)(intptr_t)msgTypeId);
 +
 +	if (msgSer != NULL) {
 +		int major=0, minor=0;
 +
 +		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 +		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +		msg_hdr->type = msgTypeId;
 +
 +
 +		if (msgSer->msgVersion != NULL){
 +			version_getMajor(msgSer->msgVersion, &major);
 +			version_getMinor(msgSer->msgVersion, &minor);
 +			msg_hdr->major = major;
 +			msg_hdr->minor = minor;
 +		}
 +
 +		void* serializedOutput = NULL;
 +		size_t serializedOutputLen = 0;
 +		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
 +
 +		pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
 +		msg->header = msg_hdr;
 +		msg->payload = (char*)serializedOutput;
 +		msg->payloadSize = serializedOutputLen;
 +
 +
 +		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
 +			status = -1;
 +		}
 +		free(msg_hdr);
 +		free(msg);
 +		free(serializedOutput);
 +
 +
 +	} else {
 +		printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId);
 +		status=-1;
 +	}
 +
 +	celixThreadMutex_unlock(&(bound->mp_lock));
 +	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +	return status;
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int*
msgTypeId){
 +	*msgTypeId = utils_stringHash(msgType);
 +	return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +	double scaled = (double)(((double)random())/((double)RAND_MAX));
 +	return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt
tp,bundle_pt bundle){
 +
 +	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +	if (bound != NULL) {
 +
 +		bound->parent = tp;
 +		bound->bundle = bundle;
 +		bound->getCount = 1;
 +		celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +		if(tp->serializer != NULL){
 +			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +		}
 +
 +		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
 +		bound->scope=strdup(pubEP->scope);
 +		bound->topic=strdup(pubEP->topic);
 +		bound->largeUdpHandle = largeUdp_create(1);
 +
 +		bound->service.handle = bound;
 +		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
 +		bound->service.send = pubsub_topicPublicationSend;
 +		bound->service.sendMultipart = NULL;  //Multipart not supported for UDP
 +
 +	}
 +
 +	return bound;
 +}
 +
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
 +
 +	celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
boundSvc->msgTypes);
 +	}
 +
 +	if(boundSvc->scope!=NULL){
 +		free(boundSvc->scope);
 +	}
 +
 +	if(boundSvc->topic!=NULL){
 +		free(boundSvc->topic);
 +	}
 +
 +	largeUdp_destroy(boundSvc->largeUdpHandle);
 +
 +	celixThreadMutex_unlock(&boundSvc->mp_lock);
 +	celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +	free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +	static bool firstSend = true;
 +
 +	if(firstSend){
 +		printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
 +		sleep(FIRST_SEND_DELAY);
 +		firstSend = false;
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/shell/CMakeLists.txt
----------------------------------------------------------------------
diff --cc shell/CMakeLists.txt
index ae8cf3f,11a16c1..b8aaac3
--- a/shell/CMakeLists.txt
+++ b/shell/CMakeLists.txt
@@@ -18,35 -18,38 +18,35 @@@ celix_subproject(SHELL "Option to enabl
  if (SHELL)
  	find_package(CURL REQUIRED)
  
 +	add_library(shell_api INTERFACE)
 +	target_include_directories(shell_api INTERFACE include)
 +
      add_bundle(shell
          SYMBOLIC_NAME "apache_celix_shell"
-         VERSION "2.0.0"
+         VERSION "2.1.0"
          NAME "Apache Celix Shell"
 -
          SOURCES
 +          src/activator
 +          src/shell
 +          src/lb_command
 +          src/start_command
 +          src/stop_command
 +          src/install_command
 +          src/update_command
 +          src/uninstall_command
 +          src/log_command
 +          src/inspect_command
 +          src/help_command
 +	)
 +	target_include_directories(shell PRIVATE src ${CURL_INCLUDE_DIRS})
 +	target_link_libraries(shell PRIVATE Celix::shell_api ${CURL_LIBRARIES} Celix::log_service_api
Celix::log_helper)
  
 -          private/src/activator
 -          private/src/shell
 -          private/src/lb_command
 -          private/src/start_command
 -          private/src/stop_command
 -          private/src/install_command
 -          private/src/update_command
 -          private/src/uninstall_command
 -          private/src/log_command
 -          private/src/inspect_command
 -          private/src/help_command
 -
 -          ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 -
 -    )
 -    
 -    install_bundle(shell
 +	install_bundle(shell
      	HEADERS
 -    		public/include/shell.h public/include/command.h public/include/shell_constants.h
 -	)
 +    		include/shell.h include/command.h include/shell_constants.h
 +    )
  
 -	include_directories("public/include")
 -	include_directories("private/include")
 -    include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
 -    include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
 -	include_directories(${CURL_INCLUDE_DIRS})
 -    target_link_libraries(shell celix_framework ${CURL_LIBRARIES})
 +	#Setup target aliases to match external usage
 +	add_library(Celix::shell_api ALIAS shell_api)
 +	add_library(Celix::shell ALIAS shell)
  endif (SHELL)

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/include/properties.h
----------------------------------------------------------------------
diff --cc utils/include/properties.h
index cf93ca0,0000000..5c6dc4d
mode 100644,000000..100644
--- a/utils/include/properties.h
+++ b/utils/include/properties.h
@@@ -1,66 -1,0 +1,68 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.h
 + *
 + *  \date       Apr 27, 2010
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#ifndef PROPERTIES_H_
 +#define PROPERTIES_H_
 +
 +#include <stdio.h>
 +
 +#include "hash_map.h"
 +#include "exports.h"
 +#include "celix_errno.h"
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +typedef hash_map_pt properties_pt;
 +typedef hash_map_t properties_t;
 +
 +UTILS_EXPORT properties_pt properties_create(void);
 +
 +UTILS_EXPORT void properties_destroy(properties_pt properties);
 +
 +UTILS_EXPORT properties_pt properties_load(const char *filename);
 +
 +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream);
 +
++UTILS_EXPORT properties_pt properties_loadFromString(const char *input);
++
 +UTILS_EXPORT void properties_store(properties_pt properties, const char *file, const char
*header);
 +
 +UTILS_EXPORT const char *properties_get(properties_pt properties, const char *key);
 +
 +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, const char
*key, const char *defaultValue);
 +
 +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, const char *value);
 +
 +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, properties_pt *copy);
 +
 +#define PROPERTIES_FOR_EACH(props, key) \
 +    for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
 +        hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +#endif /* PROPERTIES_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/src/properties.c
----------------------------------------------------------------------
diff --cc utils/src/properties.c
index 0bd6dc3,0000000..1e097a0
mode 100644,000000..100644
--- a/utils/src/properties.c
+++ b/utils/src/properties.c
@@@ -1,302 -1,0 +1,330 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.c
 + *
 + *  \date       Apr 27, 2010
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <ctype.h>
 +#include "celixbool.h"
 +#include "properties.h"
 +#include "utils.h"
 +
 +#define MALLOC_BLOCK_SIZE		5
 +
 +static void parseLine(const char* line, properties_pt props);
 +
 +properties_pt properties_create(void) {
 +	return hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
 +}
 +
 +void properties_destroy(properties_pt properties) {
 +	hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +	while (hashMapIterator_hasNext(iter)) {
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free(hashMapEntry_getKey(entry));
 +		free(hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(properties, false, false);
 +}
 +
 +properties_pt properties_load(const char* filename) {
 +	FILE *file = fopen(filename, "r");
 +	if(file==NULL){
 +		return NULL;
 +	}
 +	properties_pt props = properties_loadWithStream(file);
 +	fclose(file);
 +	return props;
 +}
 +
 +properties_pt properties_loadWithStream(FILE *file) {
 +	properties_pt props = NULL;
 +
 +
 +	if (file != NULL ) {
 +		char *saveptr;
 +		char *filebuffer = NULL;
 +		char *line = NULL;
 +		size_t file_size = 0;
 +
 +		props = properties_create();
 +		fseek(file, 0, SEEK_END);
 +		file_size = ftell(file);
 +		fseek(file, 0, SEEK_SET);
 +
 +		if(file_size > 0){
 +			filebuffer = calloc(file_size + 1, sizeof(char));
 +			if(filebuffer) {
 +				size_t rs = fread(filebuffer, sizeof(char), file_size, file);
 +				if(rs != file_size){
 +					fprintf(stderr,"fread read only %lu bytes out of %lu\n",rs,file_size);
 +				}
 +				filebuffer[file_size]='\0';
 +				line = strtok_r(filebuffer, "\n", &saveptr);
 +				while ( line != NULL ) {
 +					parseLine(line, props);
 +					line = strtok_r(NULL, "\n", &saveptr);
 +				}
 +				free(filebuffer);
 +			}
 +		}
 +	}
 +
 +	return props;
 +}
 +
++properties_pt properties_loadFromString(const char *input){
++	properties_pt props = properties_create();
++
++	char *in = strdup(input);
++	char *line = NULL;
++	char *saveLinePointer = NULL;
++
++	bool firstTime = true;
++	do {
++		if (firstTime){
++			line = strtok_r(in, "\n", &saveLinePointer);
++			firstTime = false;
++		}else {
++			line = strtok_r(NULL, "\n", &saveLinePointer);
++		}
++
++		if (line == NULL){
++			break;
++		}
++
++		parseLine(line, props);
++	} while(line != NULL);
++
++	free(in);
++
++	return props;
++}
++
 +
 +/**
 + * Header is ignored for now, cannot handle comments yet
 + */
 +void properties_store(properties_pt properties, const char* filename, const char* header)
{
 +	FILE *file = fopen ( filename, "w+" );
 +	char *str;
 +
 +	if (file != NULL) {
 +		if (hashMap_size(properties) > 0) {
 +			hash_map_iterator_pt iterator = hashMapIterator_create(properties);
 +			while (hashMapIterator_hasNext(iterator)) {
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
 +				str = hashMapEntry_getKey(entry);
 +				for (int i = 0; i < strlen(str); i += 1) {
 +					if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
 +						fputc('\\', file);
 +					}
 +					fputc(str[i], file);
 +				}
 +
 +				fputc('=', file);
 +
 +				str = hashMapEntry_getValue(entry);
 +				for (int i = 0; i < strlen(str); i += 1) {
 +					if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
 +						fputc('\\', file);
 +					}
 +					fputc(str[i], file);
 +				}
 +
 +				fputc('\n', file);
 +
 +			}
 +			hashMapIterator_destroy(iterator);
 +		}
 +		fclose(file);
 +	} else {
 +		perror("File is null");
 +	}
 +}
 +
 +celix_status_t properties_copy(properties_pt properties, properties_pt *out) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	properties_pt copy = properties_create();
 +
 +	if (copy != NULL) {
 +		hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +		while (hashMapIterator_hasNext(iter)) {
 +			hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +			char *key = hashMapEntry_getKey(entry);
 +			char *value = hashMapEntry_getValue(entry);
 +			properties_set(copy, key, value);
 +		}
 +		hashMapIterator_destroy(iter);
 +	} else {
 +		status = CELIX_ENOMEM;
 +	}
 +
 +	if (status == CELIX_SUCCESS) {
 +		*out = copy;
 +	}
 +
 +	return status;
 +}
 +
 +const char* properties_get(properties_pt properties, const char* key) {
 +	return hashMap_get(properties, (void*)key);
 +}
 +
 +const char* properties_getWithDefault(properties_pt properties, const char* key, const char*
defaultValue) {
 +	const char* value = properties_get(properties, key);
 +	return value == NULL ? defaultValue : value;
 +}
 +
 +void properties_set(properties_pt properties, const char* key, const char* value) {
 +	hash_map_entry_pt entry = hashMap_getEntry(properties, key);
 +	char* oldValue = NULL;
 +	if (entry != NULL) {
 +		char* oldKey = hashMapEntry_getKey(entry);
 +		oldValue = hashMapEntry_getValue(entry);
 +		hashMap_put(properties, oldKey, strndup(value, 1024*10));
 +	} else {
 +		hashMap_put(properties, strndup(key, 1024*10), strndup(value, 1024*10));
 +	}
 +	free(oldValue);
 +}
 +
 +static void updateBuffers(char **key, char ** value, char **output, int outputPos, int *key_len,
int *value_len) {
 +	if (*output == *key) {
 +		if (outputPos == (*key_len) - 1) {
 +			(*key_len) += MALLOC_BLOCK_SIZE;
 +			*key = realloc(*key, *key_len);
 +			*output = *key;
 +		}
 +	}
 +	else {
 +		if (outputPos == (*value_len) - 1) {
 +			(*value_len) += MALLOC_BLOCK_SIZE;
 +			*value = realloc(*value, *value_len);
 +			*output = *value;
 +		}
 +	}
 +}
 +
 +static void parseLine(const char* line, properties_pt props) {
 +	int linePos = 0;
 +	bool precedingCharIsBackslash = false;
 +	bool isComment = false;
 +	int outputPos = 0;
 +	char *output = NULL;
 +	int key_len = MALLOC_BLOCK_SIZE;
 +	int value_len = MALLOC_BLOCK_SIZE;
 +	linePos = 0;
 +	precedingCharIsBackslash = false;
 +	isComment = false;
 +	output = NULL;
 +	outputPos = 0;
 +
 +	//Ignore empty lines
 +	if (line[0] == '\n' && line[1] == '\0') {
 +		return;
 +	}
 +
 +	char *key = calloc(1, key_len);
 +	char *value = calloc(1, value_len);
 +	key[0] = '\0';
 +	value[0] = '\0';
 +
 +	while (line[linePos] != '\0') {
 +		if (line[linePos] == ' ' || line[linePos] == '\t') {
 +			if (output == NULL) {
 +				//ignore
 +				linePos += 1;
 +				continue;
 +			}
 +		}
 +		else {
 +			if (output == NULL) {
 +				output = key;
 +			}
 +		}
 +		if (line[linePos] == '=' || line[linePos] == ':' || line[linePos] == '#' || line[linePos]
== '!') {
 +			if (precedingCharIsBackslash) {
 +				//escaped special character
 +				output[outputPos++] = line[linePos];
 +				updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +				precedingCharIsBackslash = false;
 +			}
 +			else {
 +				if (line[linePos] == '#' || line[linePos] == '!') {
 +					if (outputPos == 0) {
 +						isComment = true;
 +						break;
 +					}
 +					else {
 +						output[outputPos++] = line[linePos];
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +					}
 +				}
 +				else { // = or :
 +					if (output == value) { //already have a seperator
 +						output[outputPos++] = line[linePos];
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +					}
 +					else {
 +						output[outputPos++] = '\0';
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +						output = value;
 +						outputPos = 0;
 +					}
 +				}
 +			}
 +		}
 +		else if (line[linePos] == '\\') {
 +			if (precedingCharIsBackslash) { //double backslash -> backslash
 +				output[outputPos++] = '\\';
 +				updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +			}
 +			precedingCharIsBackslash = true;
 +		}
 +		else { //normal character
 +			precedingCharIsBackslash = false;
 +			output[outputPos++] = line[linePos];
 +			updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +		}
 +		linePos += 1;
 +	}
 +	if (output != NULL) {
 +		output[outputPos] = '\0';
 +	}
 +
 +	if (!isComment) {
 +		//printf("putting 'key'/'value' '%s'/'%s' in properties\n", utils_stringTrim(key), utils_stringTrim(value));
 +		properties_set(props, utils_stringTrim(key), utils_stringTrim(value));
 +	}
 +	if(key) {
 +		free(key);
 +	}
 +	if(value) {
 +		free(value);
 +	}
 +
 +}


Mime
View raw message