cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r1082034 - in /cassandra/trunk: build.xml drivers/txpy/ drivers/txpy/README drivers/txpy/example.py drivers/txpy/setup.py drivers/txpy/txcql/ drivers/txpy/txcql/__init__.py drivers/txpy/txcql/connection.py drivers/txpy/txcql/connection_pool.py
Date Wed, 16 Mar 2011 03:05:22 GMT
Author: brandonwilliams
Date: Wed Mar 16 03:05:21 2011
New Revision: 1082034

URL: http://svn.apache.org/viewvc?rev=1082034&view=rev
Log:
Twisted driver for CQL
Patch by brandonwilliams, reviewed by eevans for CASSANDRA-2022

Added:
    cassandra/trunk/drivers/txpy/
    cassandra/trunk/drivers/txpy/README
    cassandra/trunk/drivers/txpy/example.py   (with props)
    cassandra/trunk/drivers/txpy/setup.py   (with props)
    cassandra/trunk/drivers/txpy/txcql/
    cassandra/trunk/drivers/txpy/txcql/__init__.py   (with props)
    cassandra/trunk/drivers/txpy/txcql/connection.py   (with props)
    cassandra/trunk/drivers/txpy/txcql/connection_pool.py   (with props)
Modified:
    cassandra/trunk/build.xml

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1082034&r1=1082033&r2=1082034&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Mar 16 03:05:21 2011
@@ -337,6 +337,11 @@
         <arg line="-o ${interface.thrift.dir}" />
         <arg line="cassandra.thrift" />
       </exec>
+      <exec executable="thrift" dir="${basedir}/interface" failonerror="true">
+        <arg line="--gen py:twisted" />
+        <arg line="-o ${interface.thrift.dir}" />
+        <arg line="cassandra.thrift" />
+      </exec>
     </target>
 
     <!-- create properties file with C version -->

Added: cassandra/trunk/drivers/txpy/README
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/README?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/README (added)
+++ cassandra/trunk/drivers/txpy/README Wed Mar 16 03:05:21 2011
@@ -0,0 +1,2 @@
+Python (Twisted) language driver for CQL, a SQL-alike query language for Apache
+Cassandra.

Added: cassandra/trunk/drivers/txpy/example.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/example.py?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/example.py (added)
+++ cassandra/trunk/drivers/txpy/example.py Wed Mar 16 03:05:21 2011
@@ -0,0 +1,29 @@
+#!/usr/bin/python
+from txcql.connection_pool import ConnectionPool
+from twisted.internet import defer, reactor
+
+HOST = 'localhost'
+PORT = 9160
+KEYSPACE = 'txcql_test'
+CF = 'test'
+
+@defer.inlineCallbacks
+def testcql(client):
+    yield client.execute("CREATE KEYSPACE ? with replication_factor=1 and strategy_class='SimpleStrategy'",
KEYSPACE)
+    yield client.execute("CREATE COLUMNFAMILY ? with comparator=ascii and default_validation=ascii",
CF)
+    yield client.execute("UPDATE ? set foo = bar, bar = baz WHERE key = test", CF)
+    res = yield client.execute("SELECT foo, bar from ? WHERE key = test", CF)
+    for r in res:
+        print r.__dict__
+    yield client.execute("DROP KEYSPACE ?", KEYSPACE)
+    reactor.stop() 
+
+if __name__ == '__main__':
+    from twisted.python import log
+    import sys
+    log.startLogging(sys.stdout)
+
+    pool = ConnectionPool(KEYSPACE)
+    testcql(pool)
+    reactor.connectTCP(HOST, PORT, pool)
+    reactor.run()

Propchange: cassandra/trunk/drivers/txpy/example.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/trunk/drivers/txpy/setup.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/setup.py?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/setup.py (added)
+++ cassandra/trunk/drivers/txpy/setup.py Wed Mar 16 03:05:21 2011
@@ -0,0 +1,37 @@
+#!/usr/bin/python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from distutils.core import setup
+from os.path import abspath, join, dirname
+
+setup(
+    name="txcql",
+    version="1.0",
+    description="Cassandra Query Language driver",
+    long_description=open(abspath(join(dirname(__file__), 'README'))).read(),
+    url="http://cassandra.apache.org",
+    packages=["txcql"],
+    requires=["thrift"],
+    provides=["txcql"],
+    classifiers=[
+        "Development Status :: 4 - Beta",
+        "License :: OSI Approved :: Apache Software License",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Topic :: Database :: Front-Ends",
+    ],
+)

Propchange: cassandra/trunk/drivers/txpy/setup.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/trunk/drivers/txpy/txcql/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/txcql/__init__.py?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/txcql/__init__.py (added)
+++ cassandra/trunk/drivers/txpy/txcql/__init__.py Wed Mar 16 03:05:21 2011
@@ -0,0 +1,16 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

Propchange: cassandra/trunk/drivers/txpy/txcql/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/trunk/drivers/txpy/txcql/connection.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/txcql/connection.py?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/txcql/connection.py (added)
+++ cassandra/trunk/drivers/txpy/txcql/connection.py Wed Mar 16 03:05:21 2011
@@ -0,0 +1,163 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from os.path import abspath, dirname, join
+from thrift.transport import TTwisted
+from thrift.protocol import TBinaryProtocol
+from thrift.Thrift import TApplicationException
+from twisted.internet import defer, reactor
+try:
+    from cassandra import Cassandra
+    from cassandra.ttypes import Compression, InvalidRequestException, \
+                                 CqlResultType, AuthenticationRequest, \
+                                 SchemaDisagreementException
+    from cql.errors import CQLException, InvalidCompressionScheme
+    from cql.marshal import prepare
+    from cql.decoders import SchemaDecoder
+    from cql.results import RowsProxy
+    from cql.connection import Connection as connection, COMPRESSION_SCHEMES, DEFAULT_COMPRESSION
+except ImportError:
+    # Hack to run from a source tree
+    import sys
+    sys.path.append(join(abspath(dirname(__file__)),
+                         '..',
+                         '..',
+                         '..',
+                         'interface',
+                         'thrift',
+                         'gen-py.twisted'))
+    sys.path.append(join(abspath(dirname(__file__)),
+                         '..',
+                         '..',
+                         'py'))
+    from cassandra import Cassandra
+    from cassandra.ttypes import Compression, InvalidRequestException, \
+                                 CqlResultType, AuthenticationRequest, \
+                                 SchemaDisagreementException
+    from cql.errors import CQLException, InvalidCompressionScheme
+    from cql.marshal import prepare
+    from cql.decoders import SchemaDecoder
+    from cql.results import RowsProxy
+    from cql.connection import Connection as connection, COMPRESSION_SCHEMES, DEFAULT_COMPRESSION
+    
+
+class Connection(TTwisted.ThriftClientProtocol):
+    def __init__(self, iprot_factory, oprot_factory=None, keyspace=None, credentials=None,
decoder=None):
+        TTwisted.ThriftClientProtocol.__init__(self, Cassandra.Client, iprot_factory, oprot_factory)
+        self.keyspace = keyspace
+        self.column_family = None
+        self.credentials = credentials
+        self.decoder = decoder
+
+    def connectionMade(self):
+        TTwisted.ThriftClientProtocol.connectionMade(self)
+        self.client.protocol = self
+        d = self.setupConnection()
+        d.addErrback(self.setupFailed)
+
+    def setupConnection(self):
+        d = defer.succeed(True)
+        if self.credentials:
+            d.addCallback(lambda _: self.client.login(AuthenticationRequest(credentials=self.credentials)))
+        if self.keyspace:
+            d.addCallback(lambda _: self.execute('USE %s;' % self.keyspace))
+        if not self.decoder:
+            d.addCallback(self.__getSchema)
+        return d
+    
+    def __getSchema(self, res=None):
+        def columns(metadata):
+            results = {}
+            for col in metadata:
+                results[col.name] = col.validation_class
+            return results
+        
+        def column_families(cf_defs):
+            cfresults = {}
+            for cf in cf_defs:
+                cfresults[cf.name] = {"comparator": cf.comparator_type}
+                cfresults[cf.name]["default_validation_class"] = \
+                         cf.default_validation_class
+                cfresults[cf.name]["columns"] = columns(cf.column_metadata)
+            return cfresults
+        
+        def setSchema(ksdefs):
+            schema = {}
+            for ksdef in ksdefs:
+                schema[ksdef.name] = column_families(ksdef.cf_defs)
+            self.decoder = SchemaDecoder(schema)
+            return True
+        d = self.client.describe_keyspaces()
+        d.addCallback(setSchema)
+        return d
+    
+    def setupFailed(self, err):
+        self.transport.loseConnection()
+
+    def prepare(self, query, *args):
+        prepared_query = prepare(query, *args)
+        match = connection._cfamily_re.match(prepared_query)
+        if match:
+            self.column_family = match.group(1)
+        else:
+            match = connection._keyspace_re.match(prepared_query)
+            if match:
+                self.keyspace = match.group(1)
+        return prepared_query
+    
+    def execute(self, query, *args, **kwargs):
+        """
+        Execute a CQL query on a remote node.
+        
+        Params:
+        * query .........: CQL query string.
+        * args ..........: Query parameters.
+        * compression ...: Query compression type (optional).
+        """
+        def _error(err):
+            if isinstance(err, InvalidRequestException):
+                raise CQLException("Bad Request: %s" % err.why)
+            elif isinstance(err, TApplicationException):
+                raise CQLException("Internal application error")
+            elif isinstance(err, SchemaDisagreementException):
+                raise CQLException("schema versions disagree, (try again later).")
+            else:
+                raise CQLException(err)
+            
+        def _success(response):
+            if response.type == CqlResultType.ROWS:
+                return RowsProxy(response.rows,
+                                 self.keyspace,
+                                 self.column_family,
+                                 self.decoder)
+            if response.type == CqlResultType.INT:
+                return response.num
+            return None
+            
+        if kwargs.has_key("compression"):
+            compress = kwargs.get("compression").upper()
+        else:
+            compress = DEFAULT_COMPRESSION
+        
+        compressed_query = connection.compress_query(self.prepare(query, *args),
+                                                     compress)
+        request_compression = getattr(Compression, compress)
+        
+        d = self.client.execute_cql_query(compressed_query,
+                                                     request_compression)
+        d.addCallbacks(_success, _error)
+        return d

Propchange: cassandra/trunk/drivers/txpy/txcql/connection.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: cassandra/trunk/drivers/txpy/txcql/connection_pool.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/txpy/txcql/connection_pool.py?rev=1082034&view=auto
==============================================================================
--- cassandra/trunk/drivers/txpy/txcql/connection_pool.py (added)
+++ cassandra/trunk/drivers/txpy/txcql/connection_pool.py Wed Mar 16 03:05:21 2011
@@ -0,0 +1,178 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from thrift.transport import TTwisted
+from thrift.protocol import TBinaryProtocol
+from twisted.internet.protocol import ReconnectingClientFactory
+from twisted.internet import defer, reactor
+from twisted.internet.error import UserError
+from connection import Connection
+from cassandra.ttypes import InvalidRequestException
+from sys import exc_info
+
+class ClientBusy(Exception):
+    pass
+        
+class ManagedConnection(Connection):
+    aborted = False
+    deferred = None
+    
+    def connectionMade(self):
+        TTwisted.ThriftClientProtocol.connectionMade(self)
+        self.client.protocol = self
+        d = self.setupConnection()
+        d.addCallbacks(
+            (lambda res: self.factory.clientIdle(self, res)),
+            self.setupFailed
+        )
+        
+    def connectionLost(self, reason=None):
+        if not self.aborted: # don't allow parent class to raise unhandled TTransport
+                             # exceptions, the manager handled our failure
+            TTwisted.ThriftClientProtocol.connectionLost(self, reason)
+        self.factory.clientGone(self)
+        
+    def setupFailed(self, err):
+        Connection.setupFailed(self, err)
+        self.factory.clientSetupFailed(err)
+        
+    def _complete(self, res=None):
+        self.deferred = None
+        self.factory.clientIdle(self)
+        return res
+    
+    def execute(self, query, *args, **kwargs):
+        if not self.deferred:
+            d = Connection.execute(self, query, *args, **kwargs)
+            d.addBoth(self._complete)
+            return d
+        else:
+            raise ClientBusy
+    
+    def abort(self):
+        self.aborted = True
+        self.transport.loseConnection()
+        
+class ConnectionPool(ReconnectingClientFactory):
+    maxDelay = 5
+    thriftFactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory
+    protocol = ManagedConnection
+
+    def __init__(self, keyspace=None, retries=0, credentials=None, decoder=None):
+        self.deferred  = defer.Deferred()
+        self.queue = defer.DeferredQueue()
+        self.continueTrying = True
+        self._protos = []
+        self._pending = []
+        self.request_retries = retries
+        self.keyspace = keyspace
+        self.credentials = credentials
+        self.decoder = decoder
+
+    def _errback(self, reason=None):
+        if self.deferred:
+            self.deferred.errback(reason)
+            self.deferred = None
+
+    def _callback(self, value=None):
+        if self.deferred:
+            self.deferred.callback(value)
+            self.deferred = None
+
+    def clientConnectionFailed(self, connector, reason):
+        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
+        self._errback(reason)
+
+    def clientSetupFailed(self, reason):
+        self._errback(reason)
+
+    def clientIdle(self, proto, result=True):
+        if proto not in self._protos:
+            self._protos.append(proto)
+        self._submitExecution(proto)
+        self._callback(result)
+
+    def buildProtocol(self, addr):
+        p = self.protocol(self.thriftFactory(),
+                          keyspace=self.keyspace,
+                          credentials=self.credentials,
+                          decoder=self.decoder)
+        p.factory = self
+        self.resetDelay()
+        return p
+
+    def clientGone(self, proto):
+        try:
+            self._protos.remove(proto)
+        except ValueError:
+            pass
+        
+    def _submitExecution(self, proto):
+        def reqError(err, req, d, r):
+            if err.check(InvalidRequestException) or r < 1:
+                if err.tb is None:
+                    try:
+                        raise err.value
+                    except Exception:
+                        # make new Failure object explicitly, so that the same
+                        # (traceback-less) one made by Thrift won't be retained
+                        # and useful tracebacks thrown away
+                        t, v, tb = exc_info()
+                        err = failure.Failure(v, t, tb)
+                d.errback(err)
+                self._pending.remove(d)
+            else:
+                self.queue.put((req, d, r))
+        def reqSuccess(res, d):
+            d.callback(res)
+            self._pending.remove(d)
+        def _process((request, deferred, retries)):
+            if not proto in self._protos:
+                # may have disconnected while we were waiting for a request
+                self.queue.put((request, deferred, retries))
+            else:
+                try:
+                    d = proto.execute(request[0], *request[1], **request[2])
+                except Exception:
+                    proto.abort()
+                    d = defer.fail()
+                retries -= 1
+                d.addCallbacks(reqSuccess,
+                               reqError,
+                               callbackArgs=[deferred],
+                               errbackArgs=[request, deferred, retries])
+        return self.queue.get().addCallback(_process)
+    
+    def execute(self, query, *args, **kwargs):
+        if 'retries' in kwargs:
+            retries = kwargs['retries']
+            del kwargs['retries']
+        else:
+            retries = self.request_retries
+        d = defer.Deferred()
+        self._pending.append(d)
+        self.queue.put(((query, args, kwargs), d, retries))
+        return d
+    
+    def shutdown(self):
+        self.stopTrying()
+        for p in self._protos:
+            if p.transport:
+                p.transport.loseConnection()
+        for d in self._pending:
+            if not d.called: d.errback(UserError(string="Shutdown requested"))
+    

Propchange: cassandra/trunk/drivers/txpy/txcql/connection_pool.py
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message