celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [1/2] celix git commit: CELIX-361: Applies fix for bug in etcd watch concerning max 1000 transactions.
Date Mon, 23 May 2016 18:52:24 GMT
Repository: celix
Updated Branches:
  refs/heads/develop f593c02e2 -> feea43ed7


CELIX-361: Applies fix for bug in etcd watch concerning max 1000 transactions.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c4060e66
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c4060e66
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c4060e66

Branch: refs/heads/develop
Commit: c4060e660e5e69609ba255e8815153dfeda17003
Parents: f593c02
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Mon May 23 20:51:42 2016 +0200
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Mon May 23 20:51:42 2016 +0200

----------------------------------------------------------------------
 .../discovery_etcd/private/include/etcd.h       |  4 +-
 .../discovery_etcd/private/src/etcd.c           | 12 +++++-
 .../discovery_etcd/private/src/etcd_watcher.c   | 40 +++++++++++++-------
 3 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h
index f5624d0..2ba09de 100644
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ b/remote_services/discovery_etcd/private/include/etcd.h
@@ -48,12 +48,14 @@
 #define ETCD_JSON_KEY			"key"
 #define ETCD_JSON_VALUE			"value"
 #define ETCD_JSON_MODIFIEDINDEX	"modifiedIndex"
+#define ETCD_ERROR_INDICATION   "errorCode"
+#define ETCD_INDEX				"index"
 
 celix_status_t etcd_init(char* server, int port);
 bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
 bool etcd_getNodes(char* directory, char** nodeNames, int* size);
 bool etcd_set(char* key, char* value, int ttl, bool prevExist);
 bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int *modifiedIndex);
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int *modifiedIndex, int *error);
 
 #endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c
index ed06fb7..1b74f66 100644
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ b/remote_services/discovery_etcd/private/src/etcd.c
@@ -310,7 +310,7 @@ bool etcd_del(char* key) {
 
 ///watch
 
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int* modifiedIndex) {
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int* modifiedIndex, int* errorCode) {
     json_error_t error;
     json_t* js_root = NULL;
     json_t* js_node = NULL;
@@ -320,7 +320,9 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue, char*
value
     json_t* js_rkey = NULL;
     json_t* js_prevValue = NULL;
     json_t* js_modIndex = NULL;
+    json_t* js_error = NULL;	// used to indicate valid json response with ETCD error indication
     bool retVal = false;
+    *errorCode = 0;
     char url[MAX_URL_LENGTH];
     int res;
     struct MemoryStruct reply;
@@ -343,6 +345,7 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue, char*
value
             js_action = json_object_get(js_root, ETCD_JSON_ACTION);
             js_node = json_object_get(js_root, ETCD_JSON_NODE);
             js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+            js_error = json_object_get(js_root, ETCD_ERROR_INDICATION);
         }
         if (js_prevNode != NULL) {
             js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
@@ -373,6 +376,13 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue,
char* value
 
             retVal = true;
         }
+        if ((js_error != NULL) && (json_is_integer(js_error))) {
+		*errorCode = json_integer_value(js_error);
+		js_modIndex = json_object_get(js_root, ETCD_INDEX);
+		if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+		    *modifiedIndex = json_integer_value(js_modIndex);
+		}
+        }
         if (js_root != NULL) {
             json_decref(js_root);
         }

http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index 915f420..0675bc2 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -264,6 +264,7 @@ static void* etcdWatcher_run(void* data) {
 	time_t timeBeforeWatch = time(NULL);
 	static char rootPath[MAX_ROOTNODE_LENGTH];
 	int highestModified = 0;
+	int errorCode=0;
 
 	bundle_context_pt context = watcher->discovery->context;
 
@@ -276,23 +277,34 @@ static void* etcdWatcher_run(void* data) {
 		char value[MAX_VALUE_LENGTH];
 		char preValue[MAX_VALUE_LENGTH];
 		char action[MAX_ACTION_LENGTH];
-        int modIndex;
-
-		if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0],
&rkey[0], &modIndex) == true) {
-			if (strcmp(action, "set") == 0) {
-				etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-			} else if (strcmp(action, "delete") == 0) {
-				etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-			} else if (strcmp(action, "expire") == 0) {
-				etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-			} else if (strcmp(action, "update") == 0) {
-				etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-			} else {
-				logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s",
action);
-			}
+        int modIndex=0;
+
+        if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0],
&rkey[0], &modIndex, &errorCode) == true) {
+		if (strcmp(action, "set") == 0) {
+			etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
+		} else if (strcmp(action, "delete") == 0) {
+			etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
+		} else if (strcmp(action, "expire") == 0) {
+			etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
+		} else if (strcmp(action, "update") == 0) {
+			etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
+		} else {
+			logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
+		}
 
+		highestModified = modIndex;
+        }
+        /* prevent busy waiting, in case etcd_watch returns false */
+        else {
+		switch (errorCode) {
+		case 401:
+			// Etcd can store at most 1000 events
 			highestModified = modIndex;
+			break;
+		default:
+			break;
 		}
+        }
 
 		// update own framework uuid
 		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) {


Mime
View raw message