From commits-return-6778-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Mon Aug 6 14:14:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id AA2DD180627 for ; Mon, 6 Aug 2018 14:14:12 +0200 (CEST) Received: (qmail 53129 invoked by uid 500); 6 Aug 2018 12:14:11 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 52356 invoked by uid 99); 6 Aug 2018 12:14:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Aug 2018 12:14:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AE94DFF92; Mon, 6 Aug 2018 12:14:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andor@apache.org To: commits@zookeeper.apache.org Date: Mon, 06 Aug 2018 12:14:16 -0000 Message-Id: <9d34ceced414424ab2d10da0db52234b@git.apache.org> In-Reply-To: <23d59816fc864cdcaa333309761a6f23@git.apache.org> References: <23d59816fc864cdcaa333309761a6f23@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/45] zookeeper git commit: ZOOKEEPER-3030: MAVEN MIGRATION - Step 1.3 - move contrib directories http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/c/zookeeper.c ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/c/zookeeper.c b/zookeeper-contrib/zookeeper-contrib-zkpython/src/c/zookeeper.c new file mode 100644 index 0000000..4474661 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/c/zookeeper.c @@ -0,0 +1,1664 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +////////////////////////////////////////////// +// EXCEPTIONS +PyObject *ZooKeeperException = NULL; +PyObject *SystemErrorException; +PyObject *RuntimeInconsistencyException; +PyObject *DataInconsistencyException; +PyObject *ConnectionLossException; +PyObject *MarshallingErrorException; +PyObject *UnimplementedException; +PyObject *OperationTimeoutException; +PyObject *BadArgumentsException; +PyObject *InvalidStateException; + +PyObject *ApiErrorException; +PyObject *NoNodeException; +PyObject *NoAuthException; +PyObject *NodeExistsException; +PyObject *BadVersionException; +PyObject *NoChildrenForEphemeralsException; +PyObject *NotEmptyException; +PyObject *SessionExpiredException; +PyObject *SessionMovedException; +PyObject *InvalidCallbackException; +PyObject *InvalidACLException; +PyObject *AuthFailedException; +PyObject *ClosingException; +PyObject *NothingException; + +PyObject *err_to_exception(int errcode) { + switch (errcode) { + case ZSYSTEMERROR: + return SystemErrorException; + case ZINVALIDSTATE: + return InvalidStateException; + case ZRUNTIMEINCONSISTENCY: + return RuntimeInconsistencyException; + case ZDATAINCONSISTENCY: + return DataInconsistencyException; + case ZCONNECTIONLOSS: + return ConnectionLossException; + case ZMARSHALLINGERROR: + return MarshallingErrorException; + case ZUNIMPLEMENTED: + return UnimplementedException; + case ZOPERATIONTIMEOUT: + return OperationTimeoutException; + case ZBADARGUMENTS: + return BadArgumentsException; + case ZAPIERROR: + return ApiErrorException; + case ZNONODE: + return NoNodeException; + case ZNOAUTH: + return NoAuthException; + case ZBADVERSION: + return BadVersionException; + case ZNOCHILDRENFOREPHEMERALS: + return NoChildrenForEphemeralsException; + case ZNODEEXISTS: + return NodeExistsException; + case ZINVALIDACL: + return InvalidACLException; + case ZAUTHFAILED: + return AuthFailedException; + case ZNOTEMPTY: + return NotEmptyException; + case ZSESSIONEXPIRED: + return SessionExpiredException; + case ZINVALIDCALLBACK: + return InvalidCallbackException; + case ZSESSIONMOVED: + return SessionMovedException; + case ZCLOSING: + return ClosingException; + case ZNOTHING: + return NothingException; + case ZOK: + default: + return NULL; + } +} + + +#define CHECK_ZHANDLE(z) if ( (z) < 0 || (z) >= num_zhandles) { \ + PyErr_SetString( ZooKeeperException, "zhandle out of range" ); \ +return NULL; \ +} else if ( zhandles[(z)] == NULL ) { \ + PyErr_SetString(ZooKeeperException, "zhandle already freed"); \ + return NULL; \ + } + +/* Contains all the state required for a watcher callback - these are + passed to the *dispatch functions as void*, cast to pywatcher_t and + then their callback member is invoked if not NULL */ +typedef struct { + int zhandle; + PyObject *callback; + int permanent; +}pywatcher_t; + +/* This array exists because we need to ref. count the global watchers + for each connection - but they're inaccessible without pulling in + zk_adaptor.h, which I'm trying to avoid. */ +static pywatcher_t **watchers; + +/* We keep an array of zhandles available for use. When a zhandle is + correctly closed, the C client frees the memory so we set the + zhandles[i] entry to NULL. This entry can then be re-used. */ +static zhandle_t** zhandles = NULL; +static int num_zhandles = 0; +static int max_zhandles = 0; +#define REAL_MAX_ZHANDLES 32768 + +/* -------------------------------------------------------------------------- */ +/* zhandles - unique connection ids - tracking */ +/* -------------------------------------------------------------------------- */ + + +/* Allocates an initial zhandle and watcher array */ +int init_zhandles(int num) { + zhandles = malloc(sizeof(zhandle_t*)*num); + watchers = malloc(sizeof(pywatcher_t*)*num); + if (zhandles == NULL || watchers == NULL) { + return 0; + } + max_zhandles = num; + num_zhandles = 0; + memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles); + return 1; +} + +/* Note that the following zhandle functions are not thread-safe. The + C-Python runtime does not seem to pre-empt a thread that is in a C + module, so there's no need for synchronisation. */ + +/* Doubles the size of the zhandle / watcher array Returns 0 if the + new array would be >= REAL_MAX_ZHANDLES in size. Called when zhandles + is full. Returns 0 if allocation failed or if max num zhandles + exceeded. */ +int resize_zhandles(void) { + zhandle_t **tmp = zhandles; + pywatcher_t ** wtmp = watchers; + if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) { + return 0; + } + max_zhandles *= 2; + zhandles = malloc(sizeof(zhandle_t*)*max_zhandles); + if (zhandles == NULL) { + PyErr_SetString(PyExc_MemoryError, "malloc for new zhandles failed"); + return 0; + } + memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles); + memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2); + + watchers = malloc(sizeof(pywatcher_t*)*max_zhandles); + if (watchers == NULL) { + PyErr_SetString(PyExc_MemoryError, "malloc for new watchers failed"); + return 0; + } + memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles); + memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2); + + free(wtmp); + free(tmp); + return 1; +} + +/* Find a free zhandle - this iterates through the list of open + zhandles, but we expect it to be infrequently called. There are + optimisations that can be made if this turns out to be problematic. + Returns -1 if no free handle is found - resize_handles() can be + called in that case. */ +unsigned int next_zhandle(void) { + int i = 0; + for (i=0;izhandle = zh; ret->callback = cb; ret->permanent = permanent; + return ret; +} + +/* Releases the reference taken in create_pywatcher to the callback, + then frees the allocated pywatcher_t* */ +void free_pywatcher(pywatcher_t *pw) +{ + if (pw == NULL) { + return; + } + Py_DECREF(pw->callback); + + free(pw); +} + +/* Constructs a new stat object. Returns Py_None if stat == NULL or a + dictionary containing all the stat information otherwise. In either + case, takes a reference to the returned object. */ +PyObject *build_stat( const struct Stat *stat ) +{ + if (stat == NULL) { + Py_INCREF(Py_None); + return Py_None; + } + return Py_BuildValue( "{s:K, s:K, s:K, s:K," + "s:i, s:i, s:i, s:K," + "s:i, s:i, s:K}", + "czxid", stat->czxid, + "mzxid", stat->mzxid, + "ctime", stat->ctime, + "mtime", stat->mtime, + "version", stat->version, + "cversion", stat->cversion, + "aversion", stat->aversion, + "ephemeralOwner", stat->ephemeralOwner, + "dataLength", stat->dataLength, + "numChildren", stat->numChildren, + "pzxid", stat->pzxid ); +} + +/* Creates a new list of strings from a String_vector. Returns the + empty list if the String_vector is NULL. Takes a reference to the + returned PyObject and gives that reference to the caller. */ +PyObject *build_string_vector(const struct String_vector *sv) +{ + PyObject *ret; + if (!sv) { + return PyList_New(0); + } + + ret = PyList_New(sv->count); + if (ret) { + int i; + for (i=0;icount;++i) { +#if PY_MAJOR_VERSION >= 3 + PyObject *s = PyUnicode_FromString(sv->data[i]); +#else + PyObject *s = PyString_FromString(sv->data[i]); +#endif + if (!s) { + if (ret != Py_None) { + Py_DECREF(ret); + } + ret = NULL; + break; + } + PyList_SetItem(ret, i, s); + } + } + return ret; +} + +/* Returns 1 if the PyObject is a valid representation of an ACL, and + 0 otherwise. */ +int check_is_acl(PyObject *o) { + int i; + PyObject *entry; + if (o == NULL) { + return 0; + } + if (!PyList_Check(o)) { + return 0; + } + for (i=0;icount); + int i; + for (i=0;icount;++i) { + PyObject *acl = Py_BuildValue( "{s:i, s:s, s:s}", + "perms", acls->data[i].perms, + "scheme", acls->data[i].id.scheme, + "id", acls->data[i].id.id ); + PyList_SetItem(ret, i, acl); + } + return ret; +} + +/* Parse the Python representation of an ACL list into an ACL_vector + (which needs subsequent freeing) */ +int parse_acls(struct ACL_vector *acls, PyObject *pyacls) +{ + PyObject *a; + int i; + if (acls == NULL || pyacls == NULL) { + PyErr_SetString(PyExc_ValueError, "acls or pyacls NULL in parse_acls"); + return 0; + } + + acls->count = PyList_Size( pyacls ); + + // Is this a list? If not, we can't do anything + if (PyList_Check(pyacls) == 0) { + PyErr_SetString(InvalidACLException, "List of ACLs required in parse_acls"); + return 0; + } + + acls->data = (struct ACL *)calloc(acls->count, sizeof(struct ACL)); + if (acls->data == NULL) { + PyErr_SetString(PyExc_MemoryError, "calloc failed in parse_acls"); + return 0; + } + + for (i=0;icount;++i) { + a = PyList_GetItem(pyacls, i); + // a is now a dictionary + PyObject *perms = PyDict_GetItemString( a, "perms" ); +#if PY_MAJOR_VERSION >= 3 + acls->data[i].perms = (int32_t)(PyLong_AsLong(perms)); + acls->data[i].id.id = strdup( PyUnicode_AsUnicode( PyDict_GetItemString( a, "id" ) ) ); + acls->data[i].id.scheme = strdup( PyUnicode_AsUnicode( PyDict_GetItemString( a, "scheme" ) ) ); +#else + acls->data[i].perms = (int32_t)(PyInt_AsLong(perms)); + acls->data[i].id.id = strdup( PyString_AsString( PyDict_GetItemString( a, "id" ) ) ); + acls->data[i].id.scheme = strdup( PyString_AsString( PyDict_GetItemString( a, "scheme" ) ) ); +#endif + } + return 1; +} + +/* Deallocates the memory allocated inside an ACL_vector, but not the + ACL_vector itself */ +void free_acls( struct ACL_vector *acls ) +{ + if (acls == NULL) { + return; + } + int i; + for (i=0;icount;++i) { + free(acls->data[i].id.id); + free(acls->data[i].id.scheme); + } + free(acls->data); +} + +/* -------------------------------------------------------------------------- */ +/* Watcher and callback implementation */ +/* -------------------------------------------------------------------------- */ + +/* Every watcher invocation goes through this dispatch point, which + a) acquires the global interpreter lock + + b) unpacks the PyObject to call from the passed context pointer, + which handily includes the index of the relevant zookeeper handle + to pass back to Python. + + c) Makes the call into Python, checking for error conditions which + we are responsible for detecting and doing something about (we just + print the error and plough right on) + + d) releases the lock after freeing up the context object, which is + only used for one watch invocation (watches are one-shot, unless + 'permanent' != 0) +*/ +void watcher_dispatch(zhandle_t *zzh, int type, int state, + const char *path, void *context) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)context; + PyObject *callback = pyw->callback; + if (callback == NULL) { + // This is unexpected + char msg[256]; + sprintf(msg, "pywatcher: %d %p %d", pyw->zhandle, pyw->callback, pyw->permanent); + PyErr_SetString(PyExc_ValueError, msg); + return; + } + + gstate = PyGILState_Ensure(); + PyObject *arglist = Py_BuildValue("(i,i,i,s)", pyw->zhandle,type, state, path); + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) { + PyErr_Print(); + } + Py_DECREF(arglist); + if (pyw->permanent == 0 && (type != ZOO_SESSION_EVENT || state < 0)) { + free_pywatcher(pyw); + } + PyGILState_Release(gstate); +} + +/* The completion callbacks (from asynchronous calls) are implemented similarly */ + +/* Called when an asynchronous call that returns void completes and + dispatches user provided callback */ +void void_completion_dispatch(int rc, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) + return; + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *arglist = Py_BuildValue("(i,i)", pyw->zhandle, rc); + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) + PyErr_Print(); + Py_DECREF(arglist); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* Called when an asynchronous call that returns a stat structure + completes and dispatches user provided callback */ +void stat_completion_dispatch(int rc, const struct Stat *stat, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) + return; + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *pystat = build_stat(stat); + PyObject *arglist = Py_BuildValue("(i,i,O)", pyw->zhandle,rc, pystat); + Py_DECREF(pystat); + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) + PyErr_Print(); + Py_DECREF(arglist); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* Called when an asynchronous call that returns a stat structure and + some untyped data completes and dispatches user provided + callback (used by aget) */ +void data_completion_dispatch(int rc, const char *value, int value_len, const struct Stat *stat, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) + return; + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *pystat = build_stat(stat); + PyObject *arglist = Py_BuildValue("(i,i,s#,O)", pyw->zhandle,rc, value,value_len, pystat); + Py_DECREF(pystat); + + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) + PyErr_Print(); + Py_DECREF(arglist); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* Called when an asynchronous call that returns a list of strings + completes and dispatches user provided callback */ +void strings_completion_dispatch(int rc, const struct String_vector *strings, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) + return; + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *pystrings = build_string_vector(strings); + if (pystrings) + { + PyObject *arglist = Py_BuildValue("(i,i,O)", pyw->zhandle, rc, pystrings); + if (arglist == NULL || PyObject_CallObject((PyObject*)callback, arglist) == NULL) + PyErr_Print(); + Py_DECREF(arglist); + } + else + PyErr_Print(); + Py_DECREF(pystrings); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* Called when an asynchronous call that returns a single string + completes and dispatches user provided callback */ +void string_completion_dispatch(int rc, const char *value, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) { + return; + } + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *arglist = Py_BuildValue("(i,i,s)", pyw->zhandle,rc, value); + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) + PyErr_Print(); + Py_DECREF(arglist); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* Called when an asynchronous call that returns a list of ACLs + completes and dispatches user provided callback */ +void acl_completion_dispatch(int rc, struct ACL_vector *acl, struct Stat *stat, const void *data) +{ + PyGILState_STATE gstate; + pywatcher_t *pyw = (pywatcher_t*)data; + if (pyw == NULL) { + return; + } + PyObject *callback = pyw->callback; + gstate = PyGILState_Ensure(); + PyObject *pystat = build_stat(stat); + PyObject *pyacls = build_acls(acl); + PyObject *arglist = Py_BuildValue("(i,i,O,O)", pyw->zhandle,rc, pyacls, pystat); + + Py_DECREF(pystat); + Py_DECREF(pyacls); + + if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) { + PyErr_Print(); + } + Py_DECREF(arglist); + free_pywatcher(pyw); + PyGILState_Release(gstate); +} + +/* -------------------------------------------------------------------------- */ +/* ZOOKEEPER API IMPLEMENTATION */ +/* -------------------------------------------------------------------------- */ + +static PyObject *pyzookeeper_init(PyObject *self, PyObject *args) +{ + const char *host; + PyObject *watcherfn = Py_None; + int recv_timeout = 10000; + // int clientid = -1; + clientid_t cid; + cid.client_id = -1; + const char *passwd; + int handle = next_zhandle(); + if (handle == -1) { + if (resize_zhandles() == 0) { + return NULL; + } + handle = next_zhandle(); + } + + if (handle == -1) { + PyErr_SetString(ZooKeeperException,"Couldn't find a free zhandle, something is very wrong"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, &cid.client_id, &passwd)) + return NULL; + + if (cid.client_id != -1) { + strncpy(cid.passwd, passwd, 16*sizeof(char)); + } + pywatcher_t *pyw = NULL; + if (watcherfn != Py_None) { + pyw = create_pywatcher(handle, watcherfn,1); + if (pyw == NULL) { + return NULL; + } + } + watchers[handle] = pyw; + zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? watcher_dispatch : NULL, + recv_timeout, cid.client_id == -1 ? 0 : &cid, + pyw, + 0 ); + + if (zh == NULL) + { + PyErr_SetString( ZooKeeperException, "Could not internally obtain zookeeper handle" ); + return NULL; + } + + zhandles[handle] = zh; + return Py_BuildValue( "i", handle); +} + + +/* -------------------------------------------------------------------------- */ +/* Asynchronous API implementation */ +/* -------------------------------------------------------------------------- */ + +/* Asynchronous node creation, returns integer error code */ +PyObject *pyzoo_acreate(PyObject *self, PyObject *args) +{ + int zkhid; char *path; char *value; int valuelen; + struct ACL_vector acl; int flags = 0; + PyObject *completion_callback = Py_None; + PyObject *pyacls = Py_None; + if (!PyArg_ParseTuple(args, "iss#O|iO", &zkhid, &path, + &value, &valuelen, &pyacls, &flags, + &completion_callback)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + CHECK_ACLS(pyacls); + if (parse_acls(&acl, pyacls) == 0) { + return NULL; + } + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + int err = zoo_acreate( zhandles[zkhid], + path, + value, + valuelen, + pyacls == Py_None ? NULL : &acl, + flags, + string_completion_dispatch, + pyw); + free_acls(&acl); + if (err != ZOK) + { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* Asynchronous node deletion, returns integer error code */ +PyObject *pyzoo_adelete(PyObject *self, PyObject *args) +{ + int zkhid; char *path; int version = -1; + PyObject *completion_callback = Py_None; + if (!PyArg_ParseTuple(args, "is|iO", &zkhid, &path, &version, &completion_callback)) + return NULL; + CHECK_ZHANDLE(zkhid); + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_adelete( zhandles[zkhid], + path, + version, + void_completion_dispatch, + pyw); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* Asynchronous node existence check, returns integer error code */ +PyObject *pyzoo_aexists(PyObject *self, PyObject *args) +{ + int zkhid; char *path; + PyObject *completion_callback = Py_None; + PyObject *exists_watch = Py_None; + if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path, + &exists_watch, &completion_callback)) + return NULL; + CHECK_ZHANDLE(zkhid); + void *comp_pyw = NULL; + if (completion_callback != Py_None) { + comp_pyw = create_pywatcher(zkhid, completion_callback, 0); + if (comp_pyw == NULL) { + return NULL; + } + } + void *exist_pyw = NULL; + if (exists_watch != Py_None) { + exist_pyw = create_pywatcher(zkhid, exists_watch, 0); + if (exist_pyw == NULL) { + return NULL; + } + } + + int err = zoo_awexists( zhandles[zkhid], + path, + exists_watch != Py_None ? watcher_dispatch : NULL, + exist_pyw, + stat_completion_dispatch, + comp_pyw); + + if (err != ZOK) + { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* Asynchronous node data retrieval, returns integer error code */ +PyObject *pyzoo_aget(PyObject *self, PyObject *args) +{ + int zkhid; char *path; + PyObject *completion_callback = Py_None; + PyObject *get_watch = Py_None; + void *comp_pw = NULL; + void *watch_pw = NULL; + + if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path, + &get_watch, &completion_callback)) { + return NULL; + } + + CHECK_ZHANDLE(zkhid); + + if (get_watch != Py_None) { + if ((watch_pw = create_pywatcher(zkhid, get_watch, 0)) == NULL) { + return NULL; + } + } + + if (completion_callback != Py_None) { + if ((comp_pw = create_pywatcher(zkhid, completion_callback, 0)) == NULL) { + return NULL; + } + } + + int err = zoo_awget( zhandles[zkhid], + path, + get_watch != Py_None ? watcher_dispatch : NULL, + watch_pw, + data_completion_dispatch, + comp_pw); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* Asynchronous node contents update, returns integer error code */ +PyObject *pyzoo_aset(PyObject *self, PyObject *args) +{ + int zkhid; char *path; char *buffer; int buflen; int version=-1; + PyObject *completion_callback = Py_None; + if (!PyArg_ParseTuple(args, "iss#|iO", &zkhid, &path, &buffer, &buflen, &version, &completion_callback)) + return NULL; + CHECK_ZHANDLE(zkhid); + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + int err = zoo_aset( zhandles[zkhid], + path, + buffer, + buflen, + version, + stat_completion_dispatch, + pyw); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* Asynchronous node child retrieval, returns integer error code */ +PyObject *pyzoo_aget_children(PyObject *self, PyObject *args) +{ + int zkhid; char *path; + PyObject *completion_callback = Py_None; + PyObject *get_watch; + if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path, + &get_watch, &completion_callback)) + return NULL; + CHECK_ZHANDLE(zkhid); + + void *get_pyw = NULL; + if (get_watch != Py_None) { + get_pyw = create_pywatcher(zkhid, get_watch, 0); + if (get_pyw == NULL) { + return NULL; + } + } + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_awget_children( zhandles[zkhid], + path, + get_watch != Py_None ? watcher_dispatch : NULL, + get_pyw, + strings_completion_dispatch, + pyw); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* Asynchronous sync, returns integer error code */ +PyObject *pyzoo_async(PyObject *self, PyObject *args) +{ + int zkhid; char *path; + PyObject *completion_callback = Py_None; + if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, + &completion_callback)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_async( zhandles[zkhid], + path, + string_completion_dispatch, + pyw); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* Asynchronous node ACL retrieval, returns integer error code */ +PyObject *pyzoo_aget_acl(PyObject *self, PyObject *args) +{ + int zkhid; char *path; + PyObject *completion_callback = Py_None; + if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, + &completion_callback)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_aget_acl( zhandles[zkhid], + path, + acl_completion_dispatch, + pyw); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* Asynchronous node ACL update, returns integer error code */ +PyObject *pyzoo_aset_acl(PyObject *self, PyObject *args) +{ + int zkhid; char *path; int version; + PyObject *completion_callback = Py_None, *pyacl; + struct ACL_vector aclv; + if (!PyArg_ParseTuple(args, "isiO|O", &zkhid, &path, &version, + &pyacl, &completion_callback)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + CHECK_ACLS(pyacl); + if (parse_acls(&aclv, pyacl) == 0) { + return NULL; + } + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_aset_acl( zhandles[zkhid], + path, + version, + &aclv, + void_completion_dispatch, + pyw); + free_acls(&aclv); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* Asynchronous authorization addition, returns integer error code */ +PyObject *pyzoo_add_auth(PyObject *self, PyObject *args) +{ + int zkhid; + char *scheme, *cert; + int certLen; + PyObject *completion_callback; + + if (!PyArg_ParseTuple(args, "iss#O", &zkhid, &scheme, &cert, &certLen, + &completion_callback)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + + void *pyw = NULL; + if (completion_callback != Py_None) { + pyw = create_pywatcher(zkhid, completion_callback, 0); + if (pyw == NULL) { + return NULL; + } + } + + int err = zoo_add_auth( zhandles[zkhid], + scheme, + cert, + certLen, + void_completion_dispatch, + pyw); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* -------------------------------------------------------------------------- */ +/* Synchronous API implementation */ +/* -------------------------------------------------------------------------- */ + +/* Synchronous node creation, returns node path string */ +static PyObject *pyzoo_create(PyObject *self, PyObject *args) +{ + char *path; + int zkhid; + char* values; + int valuelen; + PyObject *acl = NULL; + int flags = 0; + char realbuf[256]; + const int maxbuf_len = 256; + if (!PyArg_ParseTuple(args, "iss#O|i",&zkhid, &path, &values, &valuelen,&acl,&flags)) + return NULL; + CHECK_ZHANDLE(zkhid); + struct ACL_vector aclv; + CHECK_ACLS(acl); + if (parse_acls(&aclv,acl) == 0) { + return NULL; + } + zhandle_t *zh = zhandles[zkhid]; + int err = zoo_create(zh, path, values, valuelen, &aclv, flags, realbuf, maxbuf_len); + free_acls(&aclv); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + + return Py_BuildValue("s", realbuf); +} + +/* Synchronous node deletion, returns integer error code */ +static PyObject *pyzoo_delete(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + int version = -1; + if (!PyArg_ParseTuple(args, "is|i",&zkhid,&path,&version)) + return NULL; + CHECK_ZHANDLE(zkhid); + zhandle_t *zh = zhandles[zkhid]; + int err = zoo_delete(zh, path, version); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err); +} + +/* Synchronous node existence check, returns stat if exists, None if + absent */ +static PyObject *pyzoo_exists(PyObject *self, PyObject *args) +{ + int zkhid; char *path; PyObject *watcherfn = Py_None; + struct Stat stat; + if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + zhandle_t *zh = zhandles[zkhid]; + pywatcher_t *pw = NULL; + void *callback = NULL; + if (watcherfn != Py_None) { + pw = create_pywatcher(zkhid, watcherfn,0); + callback = watcher_dispatch; + if (pw == NULL) { + return NULL; + } + } + int err = zoo_wexists(zh, path, callback, pw, &stat); + if (err != ZOK && err != ZNONODE) { + PyErr_SetString(err_to_exception(err), zerror(err)); + free_pywatcher(pw); + return NULL; + } + if (err == ZNONODE) { + Py_INCREF(Py_None); + return Py_None; // This isn't exceptional + } + return build_stat(&stat); +} + +/* Synchronous node child retrieval, returns list of children's path + as strings */ +static PyObject *pyzoo_get_children(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + PyObject *watcherfn = Py_None; + struct String_vector strings; + if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + pywatcher_t *pw = NULL; + void *callback = NULL; + if (watcherfn != Py_None) { + pw = create_pywatcher( zkhid, watcherfn, 0 ); + callback = watcher_dispatch; + if (pw == NULL) { + return NULL; + } + } + int err = zoo_wget_children(zhandles[zkhid], path, + callback, + pw, &strings ); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + free_pywatcher(pw); + return NULL; + } + + PyObject *ret = build_string_vector(&strings); + deallocate_String_vector(&strings); + return ret; +} + +/* Synchronous node data update, returns integer error code */ +static PyObject *pyzoo_set(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + char *buffer; + int buflen; + int version = -1; + if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen, + &version)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + + int err = zoo_set(zhandles[zkhid], path, buffer, buflen, version); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + + return Py_BuildValue("i", err); +} + +/* Synchronous node data update, returns node's stat data structure */ +static PyObject *pyzoo_set2(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + char *buffer; + int buflen; + int version = -1; + if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen, + &version)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + struct Stat stat; + int err = zoo_set2(zhandles[zkhid], path, buffer, buflen, version, &stat); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + + return build_stat(&stat); +} + +/* As per ZK documentation, datanodes are limited to 1Mb. Why not do a + stat followed by a get, to determine how big the buffer should be? + Because the znode may get updated between calls, so we can't + guarantee a complete get anyhow. */ +#define GET_BUFFER_SIZE 1024*1024 + +/* pyzoo_get has an extra parameter over the java/C equivalents. If + you set the fourth integer parameter buffer_len, we return + min(buffer_len, datalength) bytes. This is set by default to + GET_BUFFER_SIZE */ +static PyObject *pyzoo_get(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + char *buffer; + int buffer_len=GET_BUFFER_SIZE; + struct Stat stat; + PyObject *watcherfn = Py_None; + pywatcher_t *pw = NULL; + if (!PyArg_ParseTuple(args, "is|Oi", &zkhid, &path, &watcherfn, &buffer_len)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + if (watcherfn != Py_None) { + pw = create_pywatcher( zkhid, watcherfn,0 ); + if (pw == NULL) { + return NULL; + } + } + buffer = malloc(sizeof(char)*buffer_len); + if (buffer == NULL) { + free_pywatcher(pw); + PyErr_SetString(PyExc_MemoryError, "buffer could not be allocated in pyzoo_get"); + return NULL; + } + + int err = zoo_wget(zhandles[zkhid], path, + watcherfn != Py_None ? watcher_dispatch : NULL, + pw, buffer, + &buffer_len, &stat); + + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + free_pywatcher(pw); + free(buffer); + return NULL; + } + + PyObject *stat_dict = build_stat( &stat ); + PyObject *ret = Py_BuildValue( "(s#,N)", buffer,buffer_len < 0 ? 0 : buffer_len, stat_dict ); + free(buffer); + + return ret; +} + +/* Synchronous node ACL retrieval, returns list of ACLs */ +PyObject *pyzoo_get_acl(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + struct ACL_vector acl; + struct Stat stat; + if (!PyArg_ParseTuple(args, "is", &zkhid, &path)) + return NULL; + CHECK_ZHANDLE(zkhid); + int err = zoo_get_acl( zhandles[zkhid], path, &acl, &stat ); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + PyObject *pystat = build_stat( &stat ); + PyObject *acls = build_acls( &acl ); + PyObject *ret = Py_BuildValue( "(O,O)", pystat, acls ); + Py_DECREF(pystat); + Py_DECREF(acls); + return ret; +} + +/* Synchronous node ACL update, returns integer error code */ +PyObject *pyzoo_set_acl(PyObject *self, PyObject *args) +{ + int zkhid; + char *path; + int version; + PyObject *pyacls; + struct ACL_vector acl; + if (!PyArg_ParseTuple(args, "isiO", &zkhid, &path, &version, &pyacls)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + if (parse_acls(&acl, pyacls) == 0) { + return NULL; + } + int err = zoo_set_acl(zhandles[zkhid], path, version, &acl ); + free_acls(&acl); + if (err != ZOK) { + PyErr_SetString(err_to_exception(err), zerror(err)); + return NULL; + } + return Py_BuildValue("i", err);; +} + +/* -------------------------------------------------------------------------- */ +/* Session and context methods */ +/* -------------------------------------------------------------------------- */ + +/* Closes a connection, returns integer error code */ +PyObject *pyzoo_close(PyObject *self, PyObject *args) +{ + int zkhid, ret; + if (!PyArg_ParseTuple(args, "i", &zkhid)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + zhandle_t *handle = zhandles[zkhid]; + Py_BEGIN_ALLOW_THREADS + ret = zookeeper_close(handle); + Py_END_ALLOW_THREADS + zhandles[zkhid] = NULL; // The zk C client frees the zhandle + return Py_BuildValue("i", ret); +} + +/* Returns the ID of current client as a tuple (client_id, passwd) */ +PyObject *pyzoo_client_id(PyObject *self, PyObject *args) +{ + int zkhid; + if (!PyArg_ParseTuple(args, "i", &zkhid)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + const clientid_t *cid = zoo_client_id(zhandles[zkhid]); + return Py_BuildValue("(L,s)", cid->client_id, cid->passwd); +} + +/* DO NOT USE - context is used internally. This method is not exposed + in the Python module */ +PyObject *pyzoo_get_context(PyObject *self, PyObject *args) +{ + int zkhid; + if (!PyArg_ParseTuple(args, "i", &zkhid)) + return NULL; + CHECK_ZHANDLE(zkhid); + PyObject *context = NULL; + context = (PyObject*)zoo_get_context(zhandles[zkhid]); + if (context) return context; + Py_INCREF(Py_None); + return Py_None; +} + +/* DO NOT USE - context is used internally. This method is not exposed + in the Python module */ +PyObject *pyzoo_set_context(PyObject *self, PyObject *args) +{ + int zkhid; + PyObject *context; + if (!PyArg_ParseTuple(args, "iO", &zkhid, &context)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + PyObject *py_context = (PyObject*)zoo_get_context(zhandles[zkhid]); + if (py_context != NULL && py_context != Py_None) { + Py_DECREF(py_context); + } + Py_INCREF(context); + zoo_set_context(zhandles[zkhid], (void*)context); + Py_INCREF(Py_None); + return Py_None; +} + + +/* -------------------------------------------------------------------------- */ +/* Miscellaneous methods */ +/* -------------------------------------------------------------------------- */ + +/* Sets the global watcher. Returns None */ +PyObject *pyzoo_set_watcher(PyObject *self, PyObject *args) +{ + int zkhid; + PyObject *watcherfn; + if (!PyArg_ParseTuple(args, "iO", &zkhid, &watcherfn)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + pywatcher_t *pyw = watchers[zkhid]; + if (pyw != NULL) { + free_pywatcher( pyw ); + } + + // Create a *permanent* watcher object, not deallocated when called + pyw = create_pywatcher(zkhid, watcherfn,1); + if (pyw == NULL) { + return NULL; + } + watchers[zkhid] = pyw; + zoo_set_watcher(zhandles[zkhid], watcher_dispatch); + zoo_set_context(zhandles[zkhid], pyw); + Py_INCREF(Py_None); + return Py_None; +} + +/* Returns an integer code representing the current connection + state */ +PyObject *pyzoo_state(PyObject *self, PyObject *args) +{ + int zkhid; + if (!PyArg_ParseTuple(args,"i",&zkhid)) { + return NULL; + } + CHECK_ZHANDLE(zkhid); + int state = zoo_state(zhandles[zkhid]); + return Py_BuildValue("i",state); +} + + +/* Convert an integer error code into a string */ +PyObject *pyzerror(PyObject *self, PyObject *args) +{ + int rc; + if (!PyArg_ParseTuple(args,"i", &rc)) + return NULL; + return Py_BuildValue("s", zerror(rc)); +} + +/* Returns the integer receive timeout for a connection */ +PyObject *pyzoo_recv_timeout(PyObject *self, PyObject *args) +{ + int zkhid; + if (!PyArg_ParseTuple(args,"i",&zkhid)) + return NULL; + CHECK_ZHANDLE(zkhid); + int recv_timeout = zoo_recv_timeout(zhandles[zkhid]); + return Py_BuildValue("i",recv_timeout); +} + +/* Returns True if connection is unrecoverable, False otherwise */ +PyObject *pyis_unrecoverable(PyObject *self, PyObject *args) +{ + int zkhid; + if (!PyArg_ParseTuple(args,"i",&zkhid)) + return NULL; + CHECK_ZHANDLE(zkhid); + int ret = is_unrecoverable(zhandles[zkhid]); + if (ret == ZINVALIDSTATE) + Py_RETURN_TRUE; + Py_RETURN_FALSE; +} + +/* Set the debug level for logging, returns None */ +PyObject *pyzoo_set_debug_level(PyObject *self, PyObject *args) +{ + int loglevel; + if (!PyArg_ParseTuple(args, "i", &loglevel)) + return NULL; + zoo_set_debug_level((ZooLogLevel)loglevel); + Py_INCREF(Py_None); + return Py_None; +} + +static PyObject *log_stream = NULL; + +/* Set the output file-like object for logging output. Returns Py_None */ +PyObject *pyzoo_set_log_stream(PyObject *self, PyObject *args) +{ + PyObject *pystream = NULL; + if (!PyArg_ParseTuple(args,"O",&pystream)) { + PyErr_SetString(PyExc_ValueError, "Must supply a Python object to set_log_stream"); + return NULL; + } + +#if PY_MAJOR_VERSION >= 3 + extern PyTypeObject PyIOBase_Type; + if (!PyObject_IsInstance(pystream, (PyObject *)&PyIOBase_Type)) { +#else + if(!PyFile_Check(pystream)) { +#endif + + PyErr_SetString(PyExc_ValueError, "Must supply a file object to set_log_stream"); + return NULL; + } + /* Release the previous reference to log_stream that we took */ + if (log_stream != NULL) { + Py_DECREF(log_stream); + } + + log_stream = pystream; + Py_INCREF(log_stream); + +#if PY_MAJOR_VERSION >= 3 + int fd = PyObject_AsFileDescriptor(log_stream); + FILE *fp = fdopen(fd, "w"); +#else + FILE *fp = PyFile_AsFile(log_stream); +#endif + zoo_set_log_stream(fp); + + Py_INCREF(Py_None); + return Py_None; +} + +/* Set the connection order - randomized or in-order. Returns None. */ +PyObject *pyzoo_deterministic_conn_order(PyObject *self, PyObject *args) +{ + int yesOrNo; + if (!PyArg_ParseTuple(args, "i",&yesOrNo)) + return NULL; + zoo_deterministic_conn_order( yesOrNo ); + Py_INCREF(Py_None); + return Py_None; +} + +/* -------------------------------------------------------------------------- */ +/* Module setup */ +/* -------------------------------------------------------------------------- */ + +#include "pyzk_docstrings.h" + +static PyMethodDef ZooKeeperMethods[] = { + {"init", pyzookeeper_init, METH_VARARGS, pyzk_init_doc }, + {"create",pyzoo_create, METH_VARARGS, pyzk_create_doc }, + {"delete",pyzoo_delete, METH_VARARGS, pyzk_delete_doc }, + {"get_children", pyzoo_get_children, METH_VARARGS, pyzk_get_children_doc }, + {"set", pyzoo_set, METH_VARARGS, pyzk_set_doc }, + {"set2", pyzoo_set2, METH_VARARGS, pyzk_set2_doc }, + {"get",pyzoo_get, METH_VARARGS, pyzk_get_doc }, + {"exists",pyzoo_exists, METH_VARARGS, pyzk_exists_doc }, + {"get_acl", pyzoo_get_acl, METH_VARARGS, pyzk_get_acl_doc }, + {"set_acl", pyzoo_set_acl, METH_VARARGS, pyzk_set_acl_doc }, + {"close", pyzoo_close, METH_VARARGS, pyzk_close_doc }, + {"client_id", pyzoo_client_id, METH_VARARGS, pyzk_client_id_doc }, + {"set_watcher", pyzoo_set_watcher, METH_VARARGS }, + {"state", pyzoo_state, METH_VARARGS, pyzk_state_doc }, + {"recv_timeout",pyzoo_recv_timeout, METH_VARARGS }, + {"is_unrecoverable",pyis_unrecoverable, METH_VARARGS, pyzk_is_unrecoverable_doc }, + {"set_debug_level",pyzoo_set_debug_level, METH_VARARGS, pyzk_set_debug_level_doc }, + {"set_log_stream",pyzoo_set_log_stream, METH_VARARGS, pyzk_set_log_stream_doc }, + {"deterministic_conn_order",pyzoo_deterministic_conn_order, METH_VARARGS, pyzk_deterministic_conn_order_doc }, + {"acreate", pyzoo_acreate, METH_VARARGS, pyzk_acreate_doc }, + {"adelete", pyzoo_adelete, METH_VARARGS,pyzk_adelete_doc }, + {"aexists", pyzoo_aexists, METH_VARARGS,pyzk_aexists_doc }, + {"aget", pyzoo_aget, METH_VARARGS, pyzk_aget_doc }, + {"aset", pyzoo_aset, METH_VARARGS, pyzk_aset_doc }, + {"aget_children", pyzoo_aget_children, METH_VARARGS, pyzk_aget_children_doc }, + {"async", pyzoo_async, METH_VARARGS, pyzk_async_doc }, + {"aget_acl", pyzoo_aget_acl, METH_VARARGS, pyzk_aget_acl_doc }, + {"aset_acl", pyzoo_aset_acl, METH_VARARGS, pyzk_aset_acl_doc }, + {"zerror", pyzerror, METH_VARARGS, pyzk_zerror_doc }, + {"add_auth", pyzoo_add_auth, METH_VARARGS, pyzk_add_auth_doc }, + /* DO NOT USE get / set_context. Context is used internally to pass + the python watcher to a dispatch function. If you want context, set + it through set_watcher. */ + // {"get_context", pyzoo_get_context, METH_VARARGS, "" }, + // {"set_context", pyzoo_set_context, METH_VARARGS, "" }, + {NULL, NULL} +}; + +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef zookeeper_moddef = { + PyModuleDef_HEAD_INIT, + "zookeeper", + NULL, + 0, + ZooKeeperMethods, + 0, + 0, + 0, + 0 +}; +#endif + +#define ADD_INTCONSTANT(x) PyModule_AddIntConstant(module, #x, ZOO_##x) +#define ADD_INTCONSTANTZ(x) PyModule_AddIntConstant(module, #x, Z##x) + +#define ADD_EXCEPTION(x) x = PyErr_NewException("zookeeper."#x, ZooKeeperException, NULL); \ + Py_INCREF(x); \ + PyModule_AddObject(module, #x, x); + +#if PY_MAJOR_VERSION >= 3 +PyMODINIT_FUNC PyInit_zookeeper(void) { +#else +PyMODINIT_FUNC initzookeeper(void) { +#endif + PyEval_InitThreads(); + +#if PY_MAJOR_VERSION >= 3 + PyObject *module = PyModule_Create(&zookeeper_moddef); +#else + PyObject *module = Py_InitModule("zookeeper", ZooKeeperMethods); +#endif + if (init_zhandles(32) == 0) { + return; // TODO: Is there any way to raise an exception here? + } + + ZooKeeperException = PyErr_NewException("zookeeper.ZooKeeperException", + PyExc_Exception, + NULL); + + PyModule_AddObject(module, "ZooKeeperException", ZooKeeperException); + Py_INCREF(ZooKeeperException); + + int size = 10; + char version_str[size]; + snprintf(version_str, size, "%i.%i.%i", ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION); + + PyModule_AddStringConstant(module, "__version__", version_str); + + ADD_INTCONSTANT(PERM_READ); + ADD_INTCONSTANT(PERM_WRITE); + ADD_INTCONSTANT(PERM_CREATE); + ADD_INTCONSTANT(PERM_DELETE); + ADD_INTCONSTANT(PERM_ALL); + ADD_INTCONSTANT(PERM_ADMIN); + + ADD_INTCONSTANT(EPHEMERAL); + ADD_INTCONSTANT(SEQUENCE); + + ADD_INTCONSTANT(EXPIRED_SESSION_STATE); + ADD_INTCONSTANT(AUTH_FAILED_STATE); + ADD_INTCONSTANT(CONNECTING_STATE); + ADD_INTCONSTANT(ASSOCIATING_STATE); + ADD_INTCONSTANT(CONNECTED_STATE); + + ADD_INTCONSTANT(CREATED_EVENT); + ADD_INTCONSTANT(DELETED_EVENT); + ADD_INTCONSTANT(CHANGED_EVENT); + ADD_INTCONSTANT(CHILD_EVENT); + ADD_INTCONSTANT(SESSION_EVENT); + ADD_INTCONSTANT(NOTWATCHING_EVENT); + + ADD_INTCONSTANT(LOG_LEVEL_ERROR); + ADD_INTCONSTANT(LOG_LEVEL_WARN); + ADD_INTCONSTANT(LOG_LEVEL_INFO); + ADD_INTCONSTANT(LOG_LEVEL_DEBUG); + + ADD_INTCONSTANTZ(SYSTEMERROR); + ADD_INTCONSTANTZ(RUNTIMEINCONSISTENCY); + ADD_INTCONSTANTZ(DATAINCONSISTENCY); + ADD_INTCONSTANTZ(CONNECTIONLOSS); + ADD_INTCONSTANTZ(MARSHALLINGERROR); + ADD_INTCONSTANTZ(UNIMPLEMENTED); + ADD_INTCONSTANTZ(OPERATIONTIMEOUT); + ADD_INTCONSTANTZ(BADARGUMENTS); + ADD_INTCONSTANTZ(INVALIDSTATE); + + ADD_EXCEPTION(SystemErrorException); + ADD_EXCEPTION(RuntimeInconsistencyException); + ADD_EXCEPTION(DataInconsistencyException); + ADD_EXCEPTION(ConnectionLossException); + ADD_EXCEPTION(MarshallingErrorException); + ADD_EXCEPTION(UnimplementedException); + ADD_EXCEPTION(OperationTimeoutException); + ADD_EXCEPTION(BadArgumentsException); + ADD_EXCEPTION(InvalidStateException); + + ADD_INTCONSTANTZ(OK); + ADD_INTCONSTANTZ(APIERROR); + ADD_INTCONSTANTZ(NONODE); + ADD_INTCONSTANTZ(NOAUTH); + ADD_INTCONSTANTZ(BADVERSION); + ADD_INTCONSTANTZ(NOCHILDRENFOREPHEMERALS); + ADD_INTCONSTANTZ(NODEEXISTS); + ADD_INTCONSTANTZ(NOTEMPTY); + ADD_INTCONSTANTZ(SESSIONEXPIRED); + ADD_INTCONSTANTZ(INVALIDCALLBACK); + ADD_INTCONSTANTZ(INVALIDACL); + ADD_INTCONSTANTZ(AUTHFAILED); + ADD_INTCONSTANTZ(CLOSING); + ADD_INTCONSTANTZ(NOTHING); + ADD_INTCONSTANTZ(SESSIONMOVED); + + ADD_EXCEPTION(ApiErrorException); + ADD_EXCEPTION(NoNodeException); + ADD_EXCEPTION(NoAuthException); + ADD_EXCEPTION(BadVersionException); + ADD_EXCEPTION(NoChildrenForEphemeralsException); + ADD_EXCEPTION(NodeExistsException); + ADD_EXCEPTION(NotEmptyException); + ADD_EXCEPTION(SessionExpiredException); + ADD_EXCEPTION(InvalidCallbackException); + ADD_EXCEPTION(InvalidACLException); + ADD_EXCEPTION(AuthFailedException); + ADD_EXCEPTION(ClosingException); + ADD_EXCEPTION(NothingException); + ADD_EXCEPTION(SessionMovedException); + +#if PY_MAJOR_VERSION >= 3 + return module; +#endif +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/README ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/README b/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/README new file mode 100644 index 0000000..3c53454 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/README @@ -0,0 +1,8 @@ + +This folder contains sample showing how you can use ZooKeeper from Python. + +You should also check the following projects: + +* http://github.com/phunt/zk-smoketest +* http://github.com/henryr/pyzk-recipes + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/watch_znode_for_changes.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/watch_znode_for_changes.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/watch_znode_for_changes.py new file mode 100644 index 0000000..07100f0 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/examples/watch_znode_for_changes.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python2.6 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" ZNode Change Watcher Skeleton Script + +This script shows you how to write a python program that watches a specific +znode for changes and reacts to them. + +Steps to understand how this script works: + +1. start a standalone ZooKeeper server (by default it listens on localhost:2181) + +Did you know you can deploy "local clusters" by using zkconf[1]? +[1] http://github.com/phunt/zkconf + +2. enter the command line console + +3. create the test node: + [zk: (CONNECTED) 1] create /watch-test dummy-data + Created /watch-test + +4. in another shell start this script in verbose mode + $ python watch_znode_for_changes.py -v + + # you should see a lot of log messages. have a look over them because + # you can easily understand how zookeeper works + +5. update the node data: + + [zk: (CONNECTED) 2] set /watch-test new-data + cZxid = 0xa0000001a + ctime = Fri Jul 09 19:14:45 EEST 2010 + mZxid = 0xa0000001e + mtime = Fri Jul 09 19:18:18 EEST 2010 + pZxid = 0xa0000001a + cversion = 0 + dataVersion = 1 + aclVersion = 0 + ephemeralOwner = 0x0 + dataLength = 8 + numChildren = 0 + + ... and you should see similar log messages: + + 2010-07-09 19:18:18,537:11542(0xb6ea5b70):ZOO_DEBUG@process_completions@1765: Calling a watcher for node [/watch-test], type = -1 event=ZOO_CHANGED_EVENT + 2010-07-09 19:18:18,537 watch_znode_for_changes.py:83 - Running watcher: zh=0 event=3 state=3 path=/watch-test + 2010-07-09 19:18:18,537:11542(0xb6ea5b70):ZOO_DEBUG@zoo_awget@2400: Sending request xid=0x4c374b33 for path [/watch-test] to 127.0.0.1:2181 + 2010-07-09 19:18:18,545:11542(0xb76a6b70):ZOO_DEBUG@zookeeper_process@1980: Queueing asynchronous response + 2010-07-09 19:18:18,545:11542(0xb6ea5b70):ZOO_DEBUG@process_completions@1772: Calling COMPLETION_DATA for xid=0x4c374b33 rc=0 + 2010-07-09 19:18:18,545 watch_znode_for_changes.py:54 - This is where your application does work. + + You can repeat this step multiple times. + +6. that's all. in the end you can delete the node and you should see a ZOO_DELETED_EVENT + +""" + +import logging +import logging.handlers +import signal +import sys +import time +import threading +import zookeeper + +from optparse import OptionParser + +logger = logging.getLogger() + +class MyClass(threading.Thread): + znode = '/watch-test' + + def __init__(self, options, args): + threading.Thread.__init__(self) + + logger.debug('Initializing MyClass thread.') + if options.verbose: + zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG) + + self.zh = zookeeper.init(options.servers) + if zookeeper.OK != zookeeper.aget(self.zh, self.znode, + self.watcher, self.handler): + logger.critical('Unable to get znode! Exiting.') + sys.exit(1) + + def __del__(self): + zookeeper.close(self.zh) + + def aget(self): + return zookeeper.aget(self.zh, self.znode, self.watcher, self.handler) + + def handler(self, zh, rc, data, stat): + """Handle zookeeper.aget() responses. + + This code handles the zookeeper.aget callback. It does not handle watches. + + Numeric arguments map to constants. See ``DATA`` in ``help(zookeeper)`` + for more information. + + Args: + zh Zookeeper handle that made this request. + rc Return code. + data Data stored in the znode. + + Does not provide a return value. + """ + if zookeeper.OK == rc: + logger.debug('This is where your application does work.') + else: + if zookeeper.NONODE == rc: + # avoid sending too many requests if the node does not yet exists + logger.info('Node not found. Trying again to set the watch.') + time.sleep(1) + + if zookeeper.OK != self.aget(): + logger.critical('Unable to get znode! Exiting.') + sys.exit(1) + + def watcher(self, zh, event, state, path): + """Handle zookeeper.aget() watches. + + This code is called when a znode changes and triggers a data watch. + It is not called to handle the zookeeper.aget call itself. + + Numeric arguments map to constants. See ``DATA`` in ``help(zookeeper)`` + for more information. + + Args: + zh Zookeeper handle that set this watch. + event Event that caused the watch (often called ``type`` elsewhere). + state Connection state. + path Znode that triggered this watch. + + Does not provide a return value. + """ + out = ['Running watcher:', + 'zh=%d' % zh, + 'event=%d' % event, + 'state=%d' % state, + 'path=%s' % path] + logger.debug(' '.join(out)) + if event == zookeeper.CHANGED_EVENT and \ + state == zookeeper.CONNECTED_STATE and \ + self.znode == path: + if zookeeper.OK != self.aget(): + logger.critical('Unable to get znode! Exiting.') + sys.exit(1) + + def run(self): + while True: + time.sleep(86400) + + +def main(argv=None): + # Allow Ctrl-C + signal.signal(signal.SIGINT, signal.SIG_DFL) + + parser = OptionParser() + parser.add_option('-v', '--verbose', + dest='verbose', + default=False, + action='store_true', + help='Verbose logging. (default: %default)') + parser.add_option('-s', '--servers', + dest='servers', + default='localhost:2181', + help='Comma-separated list of host:port pairs. (default: %default)') + + (options, args) = parser.parse_args() + + if options.verbose: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s") + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + logger.info('Starting Zookeeper python example: %s' % ' '.join(sys.argv)) + + mc = MyClass(options, args) + mc.start() + mc.join() + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/setup.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/setup.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/setup.py new file mode 100755 index 0000000..c6a1cee --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/setup.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from distutils.core import setup, Extension + +zookeeper_basedir = "../../../" + +zookeepermodule = Extension("zookeeper", + sources=["src/c/zookeeper.c"], + include_dirs=[zookeeper_basedir + "/src/c/include", + zookeeper_basedir + "/build/c", + zookeeper_basedir + "/src/c/generated"], + libraries=["zookeeper_mt"], + library_dirs=[zookeeper_basedir + "/src/c/.libs/", + zookeeper_basedir + "/build/c/.libs/", + zookeeper_basedir + "/build/test/test-cppunit/.libs", + "/usr/local/lib" + ]) + +setup( name="ZooKeeper", + version = "0.4", + description = "ZooKeeper Python bindings", + ext_modules=[zookeepermodule] ) http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/zk.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/zk.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/zk.py new file mode 100755 index 0000000..24986e3 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/python/zk.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zookeeper, time, threading + +f = open("out.log","w") +zookeeper.set_log_stream(f) + +connected = False +conn_cv = threading.Condition( ) + +def my_connection_watcher(handle,type,state,path): + global connected, conn_cv + print("Connected, handle is ", handle) + conn_cv.acquire() + connected = True + conn_cv.notifyAll() + conn_cv.release() + +conn_cv.acquire() +print("Connecting to localhost:2181 -- ") +handle = zookeeper.init("localhost:2181", my_connection_watcher, 10000, 0) +while not connected: + conn_cv.wait() +conn_cv.release() + +def my_getc_watch( handle, type, state, path ): + print("Watch fired -- ") + print(type, state, path) + +ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}; + +try: + zookeeper.create(handle, "/zk-python", "data", [ZOO_OPEN_ACL_UNSAFE], 0) + zookeeper.get_children(handle, "/zk-python", my_getc_watch) + for i in xrange(5): + print("Creating sequence node ", i, " ", zookeeper.create(handle, "/zk-python/sequencenode", "data", [ZOO_OPEN_ACL_UNSAFE], zookeeper.SEQUENCE )) +except: + pass + +def pp_zk(handle,root, indent = 0): + """Pretty print(a zookeeper tree, starting at root)""" + def make_path(child): + if root == "/": + return "/" + child + return root + "/" + child + children = zookeeper.get_children(handle, root, None) + out = "" + for i in xrange(indent): + out += "\t" + out += "|---"+root + " :: " + zookeeper.get(handle, root, None)[0] + print(out) + for child in children: + pp_zk(handle,make_path(child),indent+1) + +print("ZNode tree -- ") +pp_zk(handle,"/") + +print("Getting ACL / Stat for /zk-python --") +(stat, acl) = zookeeper.get_acl(handle, "/zk-python") +print("Stat:: ", stat) +print("Acl:: ", acl) + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/acl_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/acl_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/acl_test.py new file mode 100644 index 0000000..1289c8a --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/acl_test.py @@ -0,0 +1,109 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zookeeper, zktestbase, unittest, threading + +ZOO_OPEN_ACL_UNSAFE = {"perms":zookeeper.PERM_ALL, "scheme":"world", "id" :"anyone"} +ZOO_ACL_READ = {"perms":zookeeper.PERM_READ, "scheme": "world", + "id":"anyone"} +class ACLTest(zktestbase.TestBase): + """Test whether basic ACL setting and getting work correctly""" + # to do: startup and teardown via scripts? + def setUp(self): + zktestbase.TestBase.setUp(self) + try: + zookeeper.delete(self.handle, "/zk-python-acltest") + zookeeper.delete(self.handle, "/zk-python-aacltest") + except: + pass + + def test_sync_acl(self): + self.assertEqual(self.connected, True) + ret = zookeeper.create(self.handle, "/zk-python-acltest", "nodecontents", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL) + acls = zookeeper.get_acl(self.handle, "/zk-python-acltest") + self.assertEqual(acls[1], [ZOO_OPEN_ACL_UNSAFE]) + self.assertRaises(zookeeper.InvalidACLException,zookeeper.set_acl,self.handle, "/zk-python-acltest", -1, ZOO_ACL_READ) + zookeeper.set_acl(self.handle, "/zk-python-acltest", -1, [ZOO_ACL_READ]) + acls = zookeeper.get_acl(self.handle, "/zk-python-acltest") + self.assertEqual(acls[1], [ZOO_ACL_READ]) + + + def test_async_acl(self): + self.cv = threading.Condition() + self.cv = threading.Condition() + def aget_callback(handle, rc, acl, stat): + self.cv.acquire() + self.callback_flag = True + self.rc = rc + self.acl = acl + self.stat = stat + self.cv.notify() + self.cv.release() + + def aset_callback(handle, rc): + self.cv.acquire() + self.callback_flag = True + self.rc = rc + self.cv.notify() + self.cv.release() + + self.assertEqual(self.connected, True, "Not connected!") + ret = zookeeper.create(self.handle, "/zk-python-aacltest", "nodecontents", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL) + + self.cv.acquire() + zookeeper.aget_acl(self.handle, "/zk-python-aacltest", aget_callback) + self.cv.wait(15) + self.cv.release() + + self.assertEqual(self.callback_flag, True, "aget_acl timed out") + self.assertEqual(self.rc, zookeeper.OK, "aget failed") + self.assertEqual(self.acl, [ZOO_OPEN_ACL_UNSAFE], "Wrong ACL returned from aget") + + self.cv.acquire() + self.callback_flag = False + zookeeper.aset_acl(self.handle, "/zk-python-aacltest", -1, [ZOO_ACL_READ], aset_callback) + self.cv.wait(15) + self.cv.release() + + self.assertEqual(self.callback_flag, True, "aset_acl timed out") + self.assertEqual(self.rc, zookeeper.OK, "aset failed") + acls = zookeeper.get_acl(self.handle, "/zk-python-aacltest") + self.assertEqual(acls[1], [ZOO_ACL_READ], "Wrong ACL returned from get when aset") + + def test_invalid_acl(self): + self.assertRaises(zookeeper.InvalidACLException, + zookeeper.create, + self.handle, + "/zk-python-aclverifytest", + "", + None, + zookeeper.EPHEMERAL) + + def test_invalid_acl2(self): + """Verify all required keys are present in the ACL.""" + invalid_acl = [{"schema": "digest", "id": "zebra"}] + self.assertRaises(zookeeper.InvalidACLException, + zookeeper.create, + self.handle, + "/zk-python-aclverifytest", + "", + invalid_acl, + zookeeper.EPHEMERAL) + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/async_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/async_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/async_test.py new file mode 100644 index 0000000..e813435 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/async_test.py @@ -0,0 +1,33 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zookeeper, zktestbase, unittest, threading + +class AsyncTest(zktestbase.TestBase): + """Test whether async works""" + # to do: startup and teardown via scripts? + def setUp( self ): + zktestbase.TestBase.setUp(self) + + def test_async(self): + self.assertEqual(self.connected, True) + ret = zookeeper.async(self.handle, "/") + self.assertEqual(ret, zookeeper.OK, "async failed") + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/callback_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/callback_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/callback_test.py new file mode 100644 index 0000000..55d7fe1 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/callback_test.py @@ -0,0 +1,155 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zookeeper, zktestbase, unittest, threading, gc + +ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"} + +class CallbackTest(zktestbase.TestBase): + """ + Test whether callbacks (watchers/completions) are correctly invoked + """ + # to do: startup and teardown via scripts? + def setUp(self): + zktestbase.TestBase.setUp(self) + self.cv = threading.Condition() + + def create_callback(self, callback): + """ + Returns a callable which signals cv and then calls callback + """ + def wrapper(*args, **kwargs): + self.cv.acquire() + callback(*args, **kwargs) + self.cv.notify() + self.cv.release() + return wrapper + + def test_none_callback(self): + """ + Test that no errors are raised when None is passed as a callback. + """ + self.ensureCreated("/zk-python-none-callback-test","test") + # To do this we need to issue two operations, waiting on the second + # to ensure that the first completes + zookeeper.get(self.handle, "/zk-python-none-callback-test", None) + (d,s) = zookeeper.get(self.handle, "/zk-python-none-callback-test") + self.assertEqual(d, "test") + + def callback_harness(self, trigger, test): + self.callback_flag = False + self.cv.acquire() + trigger() + self.cv.wait(15) + test() + + def test_dispatch_types(self): + """ + Test all the various dispatch mechanisms internal to the module. + """ + def dispatch_callback(*args, **kwargs): + self.callback_flag = True + self.ensureCreated("/zk-python-dispatch-test") + self.callback_harness( lambda: zookeeper.adelete(self.handle, + "/zk-python-dispatch-test", + -1, + self.create_callback(dispatch_callback)), + lambda: self.assertEqual(True, self.callback_flag, "Void dispatch not fired")) + + + self.ensureCreated("/zk-python-dispatch-test") + self.callback_harness( lambda: zookeeper.aexists(self.handle, + "/zk-python-dispatch-test", + None, + self.create_callback(dispatch_callback)), + lambda: self.assertEqual(True, self.callback_flag, "Stat dispatch not fired")) + + self.callback_harness( lambda: zookeeper.aget(self.handle, + "/zk-python-dispatch-test", + None, + self.create_callback(dispatch_callback)), + lambda: self.assertEqual(True, self.callback_flag, "Data dispatch not fired")) + + self.callback_harness( lambda: zookeeper.aget_children(self.handle, + "/", + None, + self.create_callback( dispatch_callback )), + lambda: self.assertEqual(True, self.callback_flag, "Strings dispatch not fired")) + + self.callback_harness( lambda: zookeeper.async(self.handle, + "/", + self.create_callback( dispatch_callback )), + lambda: self.assertEqual(True, self.callback_flag, "String dispatch not fired")) + + self.callback_harness( lambda: zookeeper.aget_acl(self.handle, + "/", + self.create_callback( dispatch_callback )), + lambda: self.assertEqual(True, self.callback_flag, "ACL dispatch not fired")) + + def test_multiple_watchers(self): + """ + Test whether multiple watchers are correctly called + """ + cv1, cv2 = threading.Condition(), threading.Condition() + def watcher1(*args, **kwargs): + cv1.acquire() + self.watcher1 = True + cv1.notify() + cv1.release() + + def watcher2(*args, **kwargs): + cv2.acquire() + self.watcher2 = True + cv2.notify() + cv2.release() + + nodename = "/zk-python-multiple-watcher-test" + self.ensureCreated(nodename, "test") + cv1.acquire() + cv2.acquire() + zookeeper.get(self.handle, nodename, watcher1) + zookeeper.get(self.handle, nodename, watcher2) + zookeeper.set(self.handle, nodename, "test") + cv1.wait(15) + cv2.wait(15) + self.assertTrue(self.watcher1 and self.watcher2, "One or more watchers failed to fire") + + def test_lose_scope(self): + """ + The idea is to test that the reference counting doesn't + fail when we retain no references outside of the module + """ + self.ensureDeleted("/zk-python-lose-scope-test") + self.ensureCreated("/zk-python-lose-scope-test") + def set_watcher(): + def fn(): self.callback_flag = True + self.callback_flag = False + zookeeper.exists(self.handle, "/zk-python-lose-scope-test", + self.create_callback( lambda handle, type, state, path: fn() ) + ) + + set_watcher() + gc.collect() + self.cv.acquire() + zookeeper.set(self.handle, "/zk-python-lose-scope-test", "test") + self.cv.wait(15) + self.assertEqual(self.callback_flag, True) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/clientid_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/clientid_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/clientid_test.py new file mode 100755 index 0000000..90c8f0a --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/clientid_test.py @@ -0,0 +1,48 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest, threading + +import zookeeper, zktestbase + +class ClientidTest(zktestbase.TestBase): + """Test whether clientids work""" + def setUp(self): + pass + + def testclientid(self): + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + cv.notify() + cv.release() + + cv.acquire() + self.handle = zookeeper.init(self.host, connection_watcher,10000,(123456,"mypassword")) + self.assertEqual(self.handle, zookeeper.OK) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + (cid,passwd) = zookeeper.client_id(self.handle) + self.assertEqual(cid,123456) + self.assertEqual(passwd,"mypassword") + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/close_deadlock_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/close_deadlock_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/close_deadlock_test.py new file mode 100644 index 0000000..921d2cc --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/close_deadlock_test.py @@ -0,0 +1,50 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import zookeeper, zktestbase, unittest, threading +import time + + +class CloseDeadlockTest(zktestbase.TestBase): + """ + This tests for the issue found in + https://issues.apache.org/jira/browse/ZOOKEEPER-763 + + zookeeper.close blocks on waiting for all completions to + finish. Previously it was doing so while holding teh GIL, stopping + any completions from actually continuing. + + This test is a failure if it does not exit within a few seconds. + """ + def deadlock(): + cv = threading.Condition() + + def callback(*args): + cv.acquire() + cv.notifyAll() + cv.release() + time.sleep(1) + + cv.acquire() + zookeeper.aget(handle, "/", None, callback) + cv.wait() + zookeeper.close(handle) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/connection_test.py ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/connection_test.py b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/connection_test.py new file mode 100755 index 0000000..3913fe3 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/connection_test.py @@ -0,0 +1,131 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest, threading, re, sys +if sys.version_info < (3,): + range = xrange + +import zookeeper, zktestbase +ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"} + +class ConnectionTest(zktestbase.TestBase): + """Test whether we can make a connection""" + def setUp(self): + pass + + def testconnection(self): + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + self.assertEqual(zookeeper.CONNECTED_STATE, state) + self.handle = handle + cv.notify() + cv.release() + + cv.acquire() + ret = zookeeper.init(self.host, connection_watcher) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + self.assertEqual(zookeeper.CONNECTED_STATE, zookeeper.state(self.handle)) + + self.assertEqual(zookeeper.close(self.handle), zookeeper.OK) + # Trying to close the same handle twice is an error, and the C library will segfault on it + # so make sure this is caught at the Python module layer + self.assertRaises(zookeeper.ZooKeeperException, + zookeeper.close, + self.handle) + + self.assertRaises(zookeeper.ZooKeeperException, + zookeeper.get, + self.handle, + "/") + + def testhandlereuse(self): + """ + Test a) multiple concurrent connections b) reuse of closed handles + """ + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + self.assertEqual(zookeeper.CONNECTED_STATE, state) + self.handle = handle + cv.notify() + cv.release() + + cv.acquire() + handles = [ zookeeper.init(self.host) for i in range(10) ] + ret = zookeeper.init(self.host, connection_watcher) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + self.assertEqual(True, self.all( [ zookeeper.state(handle) == zookeeper.CONNECTED_STATE for handle in handles ] ), + "Not all connections succeeded") + oldhandle = handles[3] + zookeeper.close(oldhandle) + newhandle = zookeeper.init(self.host) + + # This assertion tests *internal* behaviour; i.e. that the module + # correctly reuses closed handles. This is therefore implementation + # dependent. + self.assertEqual(newhandle, oldhandle, "Didn't get reused handle") + + def testmanyhandles(self): + """ + Test the ability of the module to support many handles. + """ + # We'd like to do more, but currently the C client doesn't + # work with > 83 handles (fails to create a pipe) on MacOS 10.5.8 + handles = [ zookeeper.init(self.host) for i in range(9) ] + + cv = threading.Condition() + self.connected = False + def connection_watcher(handle, type, state, path): + cv.acquire() + self.connected = True + self.assertEqual(zookeeper.CONNECTED_STATE, state) + self.handle = handle + cv.notify() + cv.release() + + cv.acquire() + ret = zookeeper.init(self.host, connection_watcher) + cv.wait(15.0) + cv.release() + self.assertEqual(self.connected, True, "Connection timed out to " + self.host) + + for i,h in enumerate(handles): + path = "/zkpython-test-handles-%s" % str(i) + self.assertEqual(path, zookeeper.create(h, path, "", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL)) + + self.assertEqual(True, self.all( zookeeper.close(h) == zookeeper.OK for h in handles )) + + def testversionstringexists(self): + self.assertTrue(hasattr(zookeeper, '__version__')) + self.assertTrue(re.match("\d.\d.\d", zookeeper.__version__)) + + + def tearDown(self): + pass + +if __name__ == '__main__': + unittest.main()