cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [1/2] cassandra git commit: cqlsh: Improve backoff policy for COPY FROM
Date Fri, 01 Apr 2016 17:37:09 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.5 af9b9cd5a -> 496418ce0


cqlsh: Improve backoff policy for COPY FROM

Patch by Stefania Alborghetti; reviewed by Tyler Hobbs for
CASSANDRA-11320


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98086b65
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98086b65
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98086b65

Branch: refs/heads/cassandra-3.5
Commit: 98086b65d0bc76631a3aeb50cddd8c9a82bc05b9
Parents: bd4cab2
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Fri Apr 1 12:29:01 2016 -0500
Committer: Tyler Hobbs <tylerlhobbs@gmail.com>
Committed: Fri Apr 1 12:29:01 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                |   1 +
 bin/cqlsh.py               |   2 +-
 conf/cqlshrc.sample        |   2 +-
 pylib/cqlshlib/copyutil.py | 267 +++++++++++++++++++++++++---------------
 4 files changed, 168 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98086b65/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b9376bc..482c41a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.5
+ * Improve backoff policy for cqlsh COPY FROM (CASSANDRA-11320)
  * Improve IF NOT EXISTS check in CREATE INDEX (CASSANDRA-11131)
  * Upgrade ohc to 0.4.3
  * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98086b65/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index e4ed830..1ef95ac 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1886,7 +1886,7 @@ class Shell(cmd.Cmd):
           SKIPROWS=0              - the number of rows to skip
           SKIPCOLS=''             - a comma separated list of column names to skip
           MAXPARSEERRORS=-1       - the maximum global number of parsing errors, -1 means
no maximum
-          MAXINSERTERRORS=-1      - the maximum global number of insert errors, -1 means
no maximum
+          MAXINSERTERRORS=1000    - the maximum global number of insert errors, -1 means
no maximum
           ERRFILE=''              - a file where to store all rows that could not be imported,
by default this is
                                     import_ks_table.err where <ks> is your keyspace
and <table> is your table name.
           PREPAREDSTATEMENTS=True - whether to use prepared statements when importing, by
default True. Set this to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98086b65/conf/cqlshrc.sample
----------------------------------------------------------------------
diff --git a/conf/cqlshrc.sample b/conf/cqlshrc.sample
index 462dcc6..cb02b04 100644
--- a/conf/cqlshrc.sample
+++ b/conf/cqlshrc.sample
@@ -202,7 +202,7 @@ port = 9042
 ; maxparseerrors = -1
 
 ;; The maximum global number of insert errors to ignore, -1 means unlimited
-; maxinserterrors = -1
+; maxinserterrors = 1000
 
 ;; A file to store all rows that could not be imported, by default this is
 ;; import_<ks>_<table>.err where <ks> is your keyspace and <table>
is your table name.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98086b65/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index 7aa5a96..53bbe09 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -28,8 +28,8 @@ import random
 import re
 import struct
 import sys
-import time
 import threading
+import time
 import traceback
 
 from bisect import bisect_right
@@ -37,16 +37,16 @@ from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
 from Queue import Queue
-from random import randrange
+from random import randint
 from StringIO import StringIO
 from select import select
 from uuid import UUID
 from util import profile_on, profile_off
 
-from cassandra.cluster import Cluster
+from cassandra.cluster import Cluster, DefaultConnection
 from cassandra.cqltypes import ReversedType, UserType
 from cassandra.metadata import protect_name, protect_names, protect_value
-from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, DCAwareRoundRobinPolicy
+from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, DCAwareRoundRobinPolicy,
FallthroughRetryPolicy
 from cassandra.query import BatchStatement, BatchType, SimpleStatement, tuple_factory
 from cassandra.util import Date, Time
 
@@ -57,6 +57,7 @@ from sslhandling import ssl_settings
 
 PROFILE_ON = False
 STRACE_ON = False
+DEBUG = False  # This may be set to True when initializing the task
 IS_LINUX = platform.system() == 'Linux'
 
 CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
@@ -70,6 +71,16 @@ def safe_normpath(fname):
     return os.path.normpath(os.path.expanduser(fname)) if fname else fname
 
 
+def printdebugmsg(msg):
+    if DEBUG:
+        printmsg(msg)
+
+
+def printmsg(msg, eol='\n'):
+    sys.stdout.write(msg + eol)
+    sys.stdout.flush()
+
+
 class OneWayChannel(object):
     """
     A one way pipe protected by two process level locks, one for reading and one for writing.
@@ -78,11 +89,49 @@ class OneWayChannel(object):
         self.reader, self.writer = mp.Pipe(duplex=False)
         self.rlock = mp.Lock()
         self.wlock = mp.Lock()
+        self.feeding_thread = None
+        self.pending_messages = None
+
+    def init_feeding_thread(self):
+        """
+        Initialize a thread that fetches messages from a queue and sends them to the channel.
+        We initialize the feeding thread lazily to avoid the fork(), since the channels are
passed to child processes.
+        """
+        if self.feeding_thread is not None or self.pending_messages is not None:
+            raise RuntimeError("Feeding thread already initialized")
+
+        self.pending_messages = Queue()
+
+        def feed():
+            send = self._send
+            pending_messages = self.pending_messages
+
+            while True:
+                try:
+                    msg = pending_messages.get()
+                    send(msg)
+                except Exception, e:
+                    printmsg('%s: %s' % (e.__class__.__name__, e.message))
+
+        feeding_thread = threading.Thread(target=feed)
+        feeding_thread.setDaemon(True)
+        feeding_thread.start()
+
+        self.feeding_thread = feeding_thread
 
     def send(self, obj):
+        if self.feeding_thread is None:
+            self.init_feeding_thread()
+
+        self.pending_messages.put(obj)
+
+    def _send(self, obj):
         with self.wlock:
             self.writer.send(obj)
 
+    def num_pending(self):
+        return self.pending_messages.qsize() if self.pending_messages else 0
+
     def recv(self):
         with self.rlock:
             return self.reader.recv()
@@ -158,8 +207,15 @@ class CopyTask(object):
         self.fname = safe_normpath(fname)
         self.protocol_version = protocol_version
         self.config_file = config_file
-        # do not display messages when exporting to STDOUT
-        self.printmsg = self._printmsg if self.fname is not None or direction == 'from' else
lambda _, eol='\n': None
+
+        # if cqlsh is invoked with --debug then set the global debug flag to True
+        if shell.debug:
+            global DEBUG
+            DEBUG = True
+
+        # do not display messages when exporting to STDOUT unless --debug is set
+        self.printmsg = printmsg if self.fname is not None or direction == 'from' or DEBUG
\
+            else lambda _, eol='\n': None
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
@@ -175,11 +231,6 @@ class CopyTask(object):
         self.columns = CopyTask.get_columns(shell, ks, table, columns)
         self.time_start = time.time()
 
-    @staticmethod
-    def _printmsg(msg, eol='\n'):
-        sys.stdout.write(msg + eol)
-        sys.stdout.flush()
-
     def maybe_read_config_file(self, opts, direction):
         """
         Read optional sections from a configuration file that  was specified in the command
options or from the default
@@ -273,12 +324,18 @@ class CopyTask(object):
         copy_options['skiprows'] = int(opts.pop('skiprows', '0'))
         copy_options['skipcols'] = opts.pop('skipcols', '')
         copy_options['maxparseerrors'] = int(opts.pop('maxparseerrors', '-1'))
-        copy_options['maxinserterrors'] = int(opts.pop('maxinserterrors', '-1'))
+        copy_options['maxinserterrors'] = int(opts.pop('maxinserterrors', '1000'))
         copy_options['errfile'] = safe_normpath(opts.pop('errfile', 'import_%s_%s.err' %
(self.ks, self.table,)))
         copy_options['ratefile'] = safe_normpath(opts.pop('ratefile', ''))
         copy_options['maxoutputsize'] = int(opts.pop('maxoutputsize', '-1'))
         copy_options['preparedstatements'] = bool(opts.pop('preparedstatements', 'true').lower()
== 'true')
 
+        # Hidden properties, they do not appear in the documentation but can be set in config
files
+        # or on the cmd line but w/o completion
+        copy_options['maxinflightmessages'] = int(opts.pop('maxinflightmessages', '512'))
+        copy_options['maxbackoffattempts'] = int(opts.pop('maxbackoffattempts', '12'))
+        copy_options['maxpendingchunks'] = int(opts.pop('maxpendingchunks', '24'))
+
         self.check_options(copy_options)
         return CopyOptions(copy=copy_options, dialect=dialect_options, unrecognized=opts)
 
@@ -773,7 +830,7 @@ class FilesReader(object):
             try:
                 return open(fname, 'rb')
             except IOError, e:
-                self.printmsg("Can't open %r for reading: %s" % (fname, e))
+                printdebugmsg("Can't open %r for reading: %s" % (fname, e))
                 return None
 
         for path in paths.split(','):
@@ -784,11 +841,6 @@ class FilesReader(object):
                 for f in glob.glob(path):
                     yield (make_source(f))
 
-    @staticmethod
-    def printmsg(msg, eol='\n'):
-        sys.stdout.write(msg + eol)
-        sys.stdout.flush()
-
     def start(self):
         self.sources = self.get_source(self.fname)
         self.next_source()
@@ -936,7 +988,6 @@ class ImportErrorHandler(object):
     def __init__(self, task):
         self.shell = task.shell
         self.options = task.options
-        self.printmsg = task.printmsg
         self.max_attempts = self.options.copy['maxattempts']
         self.max_parse_errors = self.options.copy['maxparseerrors']
         self.max_insert_errors = self.options.copy['maxinserterrors']
@@ -948,7 +999,7 @@ class ImportErrorHandler(object):
         if os.path.isfile(self.err_file):
             now = datetime.datetime.now()
             old_err_file = self.err_file + now.strftime('.%Y%m%d_%H%M%S')
-            self.printmsg("Renaming existing %s to %s\n" % (self.err_file, old_err_file))
+            printdebugmsg("Renaming existing %s to %s\n" % (self.err_file, old_err_file))
             os.rename(self.err_file, old_err_file)
 
     def max_exceeded(self):
@@ -1112,17 +1163,18 @@ class ImportTask(CopyTask):
             self.shell.printerr("{} child process(es) died unexpectedly, aborting"
                                 .format(self.num_processes - self.num_live_processes()))
         else:
-            # it is only safe to write to processes if they are all running because the feeder
process
-            # at the moment hangs whilst sending messages to a crashed worker process; in
future
-            # we could do something about this by using a BoundedSemaphore to keep track
of how many messages are
-            # queued on a pipe
+            if self.error_handler.max_exceeded():
+                self.processes[-1].terminate()  # kill the feeder
+
             for i, _ in enumerate(self.processes):
-                self.outmsg.channels[i].send(None)
+                if self.processes[i].is_alive():
+                    self.outmsg.channels[i].send(None)
 
-            if PROFILE_ON:
-                # allow time for worker processes to write profile results (only works if
processes received
-                # the poison pill above)
-                time.sleep(5)
+        # allow time for worker processes to exit cleanly
+        attempts = 50  # 100 milliseconds per attempt, so 5 seconds total
+        while attempts > 0 and self.num_live_processes() > 0:
+            time.sleep(0.1)
+            attempts -= 1
 
         self.printmsg("\n%d rows imported from %d files in %s (%d skipped)." %
                       (self.receive_meter.get_total_records(),
@@ -1166,6 +1218,7 @@ class FeedingProcess(mp.Process):
         self.send_meter = RateMeter(log_fcn=None, update_interval=1)
         self.ingest_rate = options.copy['ingestrate']
         self.num_worker_processes = options.copy['numprocesses']
+        self.max_pending_chunks = options.copy['maxpendingchunks']
         self.chunk_id = 0
 
     def run(self):
@@ -1186,10 +1239,22 @@ class FeedingProcess(mp.Process):
         reader = self.reader
         reader.start()
         channels = self.worker_channels
+        max_pending_chunks = self.max_pending_chunks
         sent = 0
+        failed_attempts = 0
 
         while not reader.exhausted:
-            for ch in channels:
+            channels_eligible = filter(lambda c: c.num_pending() < max_pending_chunks,
channels)
+            if not channels_eligible:
+                failed_attempts += 1
+                delay = randint(1, pow(2, failed_attempts))
+                printdebugmsg("All workers busy, sleeping for %d second(s)" % (delay,))
+                time.sleep(delay)
+                continue
+            elif failed_attempts > 0:
+                failed_attempts = 0
+
+            for ch in channels_eligible:
                 try:
                     max_rows = self.ingest_rate - self.send_meter.current_record
                     if max_rows <= 0:
@@ -1263,12 +1328,8 @@ class ChildProcess(mp.Process):
         else:
             self.test_failures = None
 
-    def printdebugmsg(self, text):
-        if self.debug:
-            sys.stdout.write(text + '\n')
-
     def close(self):
-        self.printdebugmsg("Closing queues...")
+        printdebugmsg("Closing queues...")
         self.inmsg.close()
         self.outmsg.close()
 
@@ -1280,7 +1341,6 @@ class ExpBackoffRetryPolicy(RetryPolicy):
     def __init__(self, parent_process):
         RetryPolicy.__init__(self)
         self.max_attempts = parent_process.max_attempts
-        self.printdebugmsg = parent_process.printdebugmsg
 
     def on_read_timeout(self, query, consistency, required_responses,
                         received_responses, data_retrieved, retry_num):
@@ -1293,14 +1353,14 @@ class ExpBackoffRetryPolicy(RetryPolicy):
     def _handle_timeout(self, consistency, retry_num):
         delay = self.backoff(retry_num)
         if delay > 0:
-            self.printdebugmsg("Timeout received, retrying after %d seconds" % (delay,))
+            printdebugmsg("Timeout received, retrying after %d seconds" % (delay,))
             time.sleep(delay)
             return self.RETRY, consistency
         elif delay == 0:
-            self.printdebugmsg("Timeout received, retrying immediately")
+            printdebugmsg("Timeout received, retrying immediately")
             return self.RETRY, consistency
         else:
-            self.printdebugmsg("Timeout received, giving up after %d attempts" % (retry_num
+ 1))
+            printdebugmsg("Timeout received, giving up after %d attempts" % (retry_num +
1))
             return self.RETHROW, None
 
     def backoff(self, retry_num):
@@ -1309,14 +1369,13 @@ class ExpBackoffRetryPolicy(RetryPolicy):
         this maximum is per query.
         To back-off we should wait a random number of seconds
         between 0 and 2^c - 1, where c is the number of total failures.
-        randrange() excludes the last value, so we drop the -1.
 
         :return : the number of seconds to wait for, -1 if we should not retry
         """
         if retry_num >= self.max_attempts:
             return -1
 
-        delay = randrange(0, pow(2, retry_num + 1))
+        delay = randint(0, pow(2, retry_num + 1) - 1)
         return delay
 
 
@@ -1333,8 +1392,8 @@ class ExportSession(object):
         session.default_fetch_size = export_process.options.copy['pagesize']
         session.default_timeout = export_process.options.copy['pagetimeout']
 
-        export_process.printdebugmsg("Created connection to %s with page size %d and timeout
%d seconds per page"
-                                     % (cluster.contact_points, session.default_fetch_size,
session.default_timeout))
+        printdebugmsg("Created connection to %s with page size %d and timeout %d seconds
per page"
+                      % (cluster.contact_points, session.default_fetch_size, session.default_timeout))
 
         self.cluster = cluster
         self.session = session
@@ -1377,7 +1436,6 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
-        self.responses = None
 
     def run(self):
         try:
@@ -1395,8 +1453,6 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
-        self.init_feeder_thread()
-
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1405,56 +1461,25 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
-    def init_feeder_thread(self):
-        """
-        Start a thread to feed response messages to the parent process.
-
-        It is not safe to write on the pipe from the main thread if the parent process is
still sending work and
-        not receiving yet. This will in fact block the main thread on the send, which in
turn won't be able to call
-        recv(), and will therefore block the parent process on its send().
-
-        It is also not safe to write on the pipe from the driver receiving thread whilst
the parent process is
-        sending work, because if the receiving thread stops making progress, then the main
thread may no longer
-        call recv() due to the check on the maximum number of requests in inner_run().
-
-        These deadlocks are easiest to reproduce with a single worker process, but may well
affect multiple worker
-        processes too.
-
-        It is important that the order of the responses in the queue is respected, or else
the parent process may
-        kill off worker processes before it has received all the pages of the last token
range.
-        """
-        def feed_errors():
-            while True:
-                try:
-                    self.outmsg.send(self.responses.get())
-                except Exception, e:
-                    self.printdebugmsg(e.message)
-
-        self.responses = Queue()
-
-        thread = threading.Thread(target=feed_errors)
-        thread.setDaemon(True)
-        thread.start()
-
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
             msg = err
         elif isinstance(err, BaseException):
             msg = "%s - %s" % (err.__class__.__name__, err)
-            if print_traceback:
-                traceback.print_exc(err)
+            if print_traceback and sys.exc_info()[1] == err:
+                traceback.print_exc()
         else:
             msg = str(err)
         return msg
 
     def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
-        self.printdebugmsg(msg)
+        printdebugmsg(msg)
         self.send((token_range, Exception(msg)))
 
     def send(self, response):
-        self.responses.put(response)
+        self.outmsg.send(response)
 
     def start_request(self, token_range, info):
         """
@@ -1494,7 +1519,7 @@ class ExportProcess(ChildProcess):
 
             if ret:
                 if errors:
-                    self.printdebugmsg("Warning: failed to connect to some replicas: %s"
% (errors,))
+                    printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
         self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts,
token_range, errors),
@@ -1647,7 +1672,6 @@ class ImportConversion(object):
         self.table = parent.table
         self.columns = parent.valid_columns
         self.nullval = parent.nullval
-        self.printdebugmsg = parent.printdebugmsg
         self.decimal_sep = parent.decimal_sep
         self.thousands_sep = parent.thousands_sep
         self.boolean_styles = parent.boolean_styles
@@ -1848,7 +1872,7 @@ class ImportConversion(object):
             elif issubclass(ct, ReversedType):
                 return convert_single_subtype(val, ct=ct)
 
-            self.printdebugmsg("Unknown type %s (%s) for val %s" % (ct, ct.typename, val))
+            printdebugmsg("Unknown type %s (%s) for val %s" % (ct, ct.typename, val))
             return val
 
         converters = {
@@ -1994,24 +2018,58 @@ class TokenMap(object):
 
 class FastTokenAwarePolicy(DCAwareRoundRobinPolicy):
     """
-    Send to any replicas attached to the query, or else fall back to DCAwareRoundRobinPolicy
+    Send to any replicas attached to the query, or else fall back to DCAwareRoundRobinPolicy.
Perform
+    exponential back-off if too many in flight requests to all replicas are already in progress.
     """
 
-    def __init__(self, local_dc='', used_hosts_per_remote_dc=0):
-        DCAwareRoundRobinPolicy.__init__(self, local_dc, used_hosts_per_remote_dc)
+    def __init__(self, parent):
+        DCAwareRoundRobinPolicy.__init__(self, parent.local_dc, 0)
+        self.max_backoff_attempts = parent.max_backoff_attempts
+        self.max_inflight_messages = parent.max_inflight_messages
 
     def make_query_plan(self, working_keyspace=None, query=None):
         """
         Extend TokenAwarePolicy.make_query_plan() so that we choose the same replicas in
preference
-        and most importantly we avoid repeating the (slow) bisect
+        and most importantly we avoid repeating the (slow) bisect. We also implement a backoff
policy
+        by sleeping an exponentially larger delay in case all connections to eligible replicas
have
+        too many in flight requests.
         """
-        replicas = query.replicas if hasattr(query, 'replicas') else []
-        for r in replicas:
-            yield r
+        connections = ConnectionWrapper.connections
+        replicas = list(query.replicas) if hasattr(query, 'replicas') else []
+        replicas.extend([r for r in DCAwareRoundRobinPolicy.make_query_plan(self, working_keyspace,
query)
+                        if r not in replicas])
+
+        if replicas:
+            def replica_is_not_overloaded(r):
+                if r.address in connections:
+                    conn = connections[r.address]
+                    return conn.in_flight < min(conn.max_request_id, self.max_inflight_messages)
+                return True
+
+            for i in xrange(self.max_backoff_attempts):
+                for r in filter(replica_is_not_overloaded, replicas):
+                    yield r
+
+                # the back-off starts at 10 ms (0.01) and it can go up to to 2^max_backoff_attempts,
+                # which is currently 12, so 2^12 = 4096 = ~40 seconds when dividing by 0.01
+                delay = randint(1, pow(2, i + 1)) * 0.01
+                printdebugmsg("All replicas busy, sleeping for %d second(s)..." % (delay,))
+                time.sleep(delay)
 
-        for r in DCAwareRoundRobinPolicy.make_query_plan(self, working_keyspace, query):
-            if r not in replicas:
-                yield r
+            printdebugmsg("Replicas too busy, given up")
+
+
+class ConnectionWrapper(DefaultConnection):
+    """
+    A wrapper to the driver default connection that helps in keeping track of messages in
flight.
+    The newly created connection is registered into a global dictionary so that FastTokenAwarePolicy
+    is able to determine if a connection has too many in flight requests.
+    """
+    connections = {}
+
+    def __init__(self, *args, **kwargs):
+        DefaultConnection.__init__(self, *args, **kwargs)
+        self.connections[self.host] = self
 
 
 class ImportProcess(ChildProcess):
@@ -2029,6 +2087,9 @@ class ImportProcess(ChildProcess):
         self.min_batch_size = options.copy['minbatchsize']
         self.max_batch_size = options.copy['maxbatchsize']
         self.use_prepared_statements = options.copy['preparedstatements']
+        self.max_inflight_messages = options.copy['maxinflightmessages']
+        self.max_backoff_attempts = options.copy['maxbackoffattempts']
+
         self.dialect_options = options.dialect
         self._session = None
         self.query = None
@@ -2044,13 +2105,14 @@ class ImportProcess(ChildProcess):
                 cql_version=self.cql_version,
                 protocol_version=self.protocol_version,
                 auth_provider=self.auth_provider,
-                load_balancing_policy=FastTokenAwarePolicy(local_dc=self.local_dc),
+                load_balancing_policy=FastTokenAwarePolicy(self),
                 ssl_options=ssl_settings(self.hostname, self.config_file) if self.ssl else
None,
-                default_retry_policy=ExpBackoffRetryPolicy(self),
+                default_retry_policy=FallthroughRetryPolicy(),  # we throw on timeouts and
retry in the error callback
                 compression=None,
                 control_connection_timeout=self.connect_timeout,
                 connect_timeout=self.connect_timeout,
-                idle_heartbeat_interval=0)
+                idle_heartbeat_interval=0,
+                connection_class=ConnectionWrapper)
 
             self._session = cluster.connect(self.ks)
             self._session.default_timeout = None
@@ -2130,9 +2192,10 @@ class ImportProcess(ChildProcess):
                 chunk['rows'] = convert_rows(conv, chunk)
                 for replicas, batch in split_into_batches(chunk, conv, tm):
                     statement = make_statement(query, conv, chunk, batch, replicas)
-                    future = session.execute_async(statement)
-                    future.add_callbacks(callback=result_callback, callback_args=(batch,
chunk),
-                                         errback=err_callback, errback_args=(batch, chunk,
replicas))
+                    if statement:
+                        future = session.execute_async(statement)
+                        future.add_callbacks(callback=result_callback, callback_args=(batch,
chunk),
+                                             errback=err_callback, errback_args=(batch, chunk,
replicas))
 
             except Exception, exc:
                 self.report_error(exc, chunk, chunk['rows'])
@@ -2314,8 +2377,8 @@ class ImportProcess(ChildProcess):
                                  errback=self.err_callback, errback_args=(batch, chunk, replicas))
 
     def report_error(self, err, chunk, rows=None, attempts=1, final=True):
-        if self.debug:
-            traceback.print_exc(err)
+        if self.debug and sys.exc_info()[1] == err:
+            traceback.print_exc()
         self.outmsg.send(ImportTaskError(err.__class__.__name__, err.message, rows, attempts,
final))
         if final:
             self.update_chunk(rows, chunk)


Mime
View raw message