cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [47/50] [abbrv] cassandra git commit: Match cassandra-loader options in COPY FROM (3.2 version)
Date Wed, 06 Jan 2016 17:03:44 GMT
Match cassandra-loader options in COPY FROM (3.2 version)

patch by Stefania; reviewed by pauloricardomg for CASSANDRA-9303


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/85b8d02a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/85b8d02a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/85b8d02a

Branch: refs/heads/trunk
Commit: 85b8d02aaebae25fa430b3d036058323a42c8e4e
Parents: 08acfe1
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Wed Jan 6 12:12:12 2016 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Jan 6 18:00:46 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NEWS.txt                                        |    7 +
 bin/cqlsh.py                                    |  137 +-
 conf/cqlshrc.sample                             |   17 +-
 pylib/cqlshlib/copyutil.py                      | 1263 +++++++++++++-----
 pylib/cqlshlib/formatting.py                    |   96 +-
 .../cql3/statements/BatchStatement.java         |   24 +
 .../cassandra/service/ClientWarningsTest.java   |    6 +-
 tools/bin/cassandra-stress.bat                  |    2 +-
 9 files changed, 1135 insertions(+), 418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/85b8d02a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43acd43..339ac4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -43,6 +43,7 @@ Merged from 2.2:
  * Add new types to Stress (CASSANDRA-9556)
  * Add property to allow listening on broadcast interface (CASSANDRA-9748)
 Merged from 2.1:
+ * Match cassandra-loader options in COPY FROM (CASSANDRA-9303)
  * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309)
  * cqlsh fails to decode utf-8 characters for text typed columns (CASSANDRA-10875)
  * Log error when stream session fails (CASSANDRA-9294)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85b8d02a/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index f5f50c1..3d468b6 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -400,6 +400,13 @@ Upgrading
      to exclude data centers when the global status is enabled, see CASSANDRA-9035 for details.
 
 
+2.1.13
+======
+
+New features
+------------
+    - New options for cqlsh COPY FROM and COPY TO, see CASSANDRA-9303 for details.
+
 2.1.10
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85b8d02a/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 5497f39..65352c8 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -41,7 +41,6 @@ import optparse
 import os
 import platform
 import sys
-import time
 import traceback
 import warnings
 import webbrowser
@@ -152,7 +151,8 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
 if os.path.isdir(cqlshlibdir):
     sys.path.insert(0, cqlshlibdir)
 
-from cqlshlib import cql3handling, cqlhandling, copyutil, pylexotron, sslhandling
+from cqlshlib import cql3handling, cqlhandling, pylexotron, sslhandling
+from cqlshlib.copyutil import ExportTask, ImportTask
 from cqlshlib.displaying import (ANSI_RESET, BLUE, COLUMN_NAME_COLORS, CYAN,
                                  RED, FormattedValue, colorme)
 from cqlshlib.formatting import (DEFAULT_DATE_FORMAT, DEFAULT_NANOTIME_FORMAT,
@@ -458,10 +458,12 @@ def complete_copy_column_names(ctxt, cqlsh):
     return set(colnames[1:]) - set(existcols)
 
 
-COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL',
-                       'MAXATTEMPTS', 'REPORTFREQUENCY']
-COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE', 'TTL']
-COPY_TO_OPTIONS = ['ENCODING', 'TIMEFORMAT', 'PAGESIZE', 'PAGETIMEOUT', 'MAXREQUESTS']
+COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', 'DATETIMEFORMAT',
+                       'MAXATTEMPTS', 'REPORTFREQUENCY', 'DECIMALSEP', 'THOUSANDSSEP', 'BOOLSTYLE',
+                       'NUMPROCESSES', 'CONFIGFILE', 'RATEFILE']
+COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE', 'MAXROWS',
+                     'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE',
'TTL']
+COPY_TO_OPTIONS = ['ENCODING', 'PAGESIZE', 'PAGETIMEOUT', 'BEGINTOKEN', 'ENDTOKEN', 'MAXOUTPUTSIZE',
'MAXREQUESTS']
 
 
 @cqlsh_syntax_completer('copyOption', 'optnames')
@@ -581,23 +583,6 @@ warnings.showwarning = show_warning_without_quoting_line
 warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure)
 
 
-def describe_interval(seconds):
-    desc = []
-    for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')):
-        num = int(seconds) / length
-        if num > 0:
-            desc.append('%d %s' % (num, unit))
-            if num > 1:
-                desc[-1] += 's'
-        seconds %= length
-    words = '%.03f seconds' % seconds
-    if len(desc) > 1:
-        words = ', '.join(desc) + ', and ' + words
-    elif len(desc) == 1:
-        words = desc[0] + ' and ' + words
-    return words
-
-
 def insert_driver_hooks():
     extend_cql_deserialization()
     auto_format_udts()
@@ -664,8 +649,7 @@ class Shell(cmd.Cmd):
     last_hist = None
     shunted_query_out = None
     use_paging = True
-    csv_dialect_defaults = dict(delimiter=',', doublequote=False,
-                                escapechar='\\', quotechar='"')
+
     default_page_size = 100
 
     def __init__(self, hostname, port, color=False,
@@ -1781,33 +1765,68 @@ class Shell(cmd.Cmd):
           COPY x TO: Exports data from a Cassandra table in CSV format.
 
         COPY <table_name> [ ( column [, ...] ) ]
-             FROM ( '<filename>' | STDIN )
+             FROM ( '<file_pattern_1, file_pattern_2, ... file_pattern_n>' | STDIN
)
              [ WITH <option>='value' [AND ...] ];
 
+        File patterns are either file names or valid python glob expressions, e.g. *.csv
or folder/*.csv.
+
         COPY <table_name> [ ( column [, ...] ) ]
              TO ( '<filename>' | STDOUT )
              [ WITH <option>='value' [AND ...] ];
 
-        Available options and defaults:
+        Available common COPY options and defaults:
 
           DELIMITER=','           - character that appears between records
           QUOTE='"'               - quoting character to be used to quote fields
           ESCAPE='\'              - character to appear before the QUOTE char when quoted
           HEADER=false            - whether to ignore the first line
           NULL=''                 - string that represents a null value
-          ENCODING='utf8'         - encoding for CSV output (COPY TO)
-          TIMEFORMAT=             - timestamp strftime format (COPY TO)
+          DATETIMEFORMAT=         - timestamp strftime format
             '%Y-%m-%d %H:%M:%S%z'   defaults to time_format value in cqlshrc
-          MAXREQUESTS=6           - the maximum number of requests each worker process can
work on in parallel (COPY TO)
-          PAGESIZE=1000           - the page size for fetching results (COPY TO)
-          PAGETIMEOUT=10          - the page timeout for fetching results (COPY TO)
-          MAXATTEMPTS=5           - the maximum number of attempts for errors
-          CHUNKSIZE=1000          - the size of chunks passed to worker processes (COPY FROM)
-          INGESTRATE=100000       - an approximate ingest rate in rows per second (COPY FROM)
-          MAXBATCHSIZE=20         - the maximum size of an import batch (COPY FROM)
-          MINBATCHSIZE=2          - the minimum size of an import batch (COPY FROM)
+          MAXATTEMPTS=5           - the maximum number of attempts per batch or range
           REPORTFREQUENCY=0.25    - the frequency with which we display status updates in
seconds
-          TTL=3600                - the time to live in seconds, by default data will not
expire (COPY FROM)
+          DECIMALSEP='.'          - the separator for decimal values
+          THOUSANDSSEP=''         - the separator for thousands digit groups
+          BOOLSTYLE='True,False'  - the representation for booleans, case insensitive, specify
true followed by false,
+                                    for example yes,no or 1,0
+          NUMPROCESSES=n          - the number of worker processes, by default the number
of cores minus one
+                                    capped at 16
+          CONFIGFILE=''           - a configuration file with the same format as .cqlshrc
(see the Python ConfigParser
+                                    documentation) where you can specify WITH options under
the following optional
+                                    sections: [copy], [copy-to], [copy-from], [copy:ks.table],
[copy-to:ks.table],
+                                    [copy-from:ks.table], where <ks> is your keyspace
name and <table> is your table
+                                    name. Options are read from these sections, in the order
specified
+                                    above, and command line options always override options
in configuration files.
+                                    Depending on the COPY direction, only the relevant copy-from
or copy-to sections
+                                    are used. If no configfile is specified then .cqlshrc
is searched instead.
+          RATEFILE=''             - an optional file where to print the output statistics
+
+        Available COPY FROM options and defaults:
+
+          CHUNKSIZE=1000          - the size of chunks passed to worker processes
+          INGESTRATE=100000       - an approximate ingest rate in rows per second
+          MINBATCHSIZE=2          - the minimum size of an import batch
+          MAXBATCHSIZE=20         - the maximum size of an import batch
+          MAXROWS=-1              - the maximum number of rows, -1 means no maximum
+          SKIPROWS=0              - the number of rows to skip
+          SKIPCOLS=''             - a comma separated list of column names to skip
+          MAXPARSEERRORS=-1       - the maximum global number of parsing errors, -1 means
no maximum
+          MAXINSERTERRORS=-1      - the maximum global number of insert errors, -1 means
no maximum
+          ERRFILE=''              - a file where to store all rows that could not be imported,
by default this is
+                                    import_ks_table.err where <ks> is your keyspace
and <table> is your table name.
+          TTL=3600                - the time to live in seconds, by default data will not
expire
+
+        Available COPY TO options and defaults:
+
+          ENCODING='utf8'          - encoding for CSV output
+          PAGESIZE='1000'          - the page size for fetching results
+          PAGETIMEOUT=10           - the page timeout in seconds for fetching results
+          BEGINTOKEN=''            - the minimum token string to consider when exporting
data
+          ENDTOKEN=''              - the maximum token string to consider when exporting
data
+          MAXREQUESTS=6            - the maximum number of requests each worker process can
work on in parallel
+          MAXOUTPUTSIZE='-1'       - the maximum size of the output file measured in number
of lines,
+                                     beyond this maximum the output file will be split into
segments,
+                                     -1 means unlimited.
 
         When entering CSV data on STDIN, you can use the sequence "\."
         on a line by itself to end the data input.
@@ -1818,55 +1837,31 @@ class Shell(cmd.Cmd):
             ks = self.current_keyspace
             if ks is None:
                 raise NoKeyspaceError("Not in any keyspace.")
-        cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
+        table = self.cql_unprotect_name(parsed.get_binding('cfname'))
         columns = parsed.get_binding('colnames', None)
         if columns is not None:
             columns = map(self.cql_unprotect_name, columns)
         else:
             # default to all known columns
-            columns = self.get_column_names(ks, cf)
+            columns = self.get_column_names(ks, table)
+
         fname = parsed.get_binding('fname', None)
         if fname is not None:
-            fname = os.path.expanduser(self.cql_unprotect_value(fname))
+            fname = self.cql_unprotect_value(fname)
+
         copyoptnames = map(str.lower, parsed.get_binding('optnames', ()))
         copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
-        cleancopyoptvals = [optval.decode('string-escape') for optval in copyoptvals]
-        opts = dict(zip(copyoptnames, cleancopyoptvals))
-
-        print "\nStarting copy of %s.%s with columns %s." % (ks, cf, columns)
-
-        timestart = time.time()
+        opts = dict(zip(copyoptnames, copyoptvals))
 
         direction = parsed.get_binding('dir').upper()
         if direction == 'FROM':
-            rows = self.perform_csv_import(ks, cf, columns, fname, opts)
-            verb = 'imported'
+            task = ImportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION,
CONFIG_FILE)
         elif direction == 'TO':
-            rows = self.perform_csv_export(ks, cf, columns, fname, opts)
-            verb = 'exported'
+            task = ExportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION,
CONFIG_FILE)
         else:
             raise SyntaxError("Unknown direction %s" % direction)
 
-        timeend = time.time()
-        print "\n%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart))
-
-    def perform_csv_import(self, ks, cf, columns, fname, opts):
-        csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self,
opts)
-        if unrecognized_options:
-            self.printerr('Unrecognized COPY FROM options: %s' % ', '.join(unrecognized_options.keys()))
-            return 0
-
-        return copyutil.ImportTask(self, ks, cf, columns, fname, csv_options, dialect_options,
-                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
-
-    def perform_csv_export(self, ks, cf, columns, fname, opts):
-        csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self,
opts)
-        if unrecognized_options:
-            self.printerr('Unrecognized COPY TO options: %s' % ', '.join(unrecognized_options.keys()))
-            return 0
-
-        return copyutil.ExportTask(self, ks, cf, columns, fname, csv_options, dialect_options,
-                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
+        task.run()
 
     def do_show(self, parsed):
         """

http://git-wip-us.apache.org/repos/asf/cassandra/blob/85b8d02a/conf/cqlshrc.sample
----------------------------------------------------------------------
diff --git a/conf/cqlshrc.sample b/conf/cqlshrc.sample
index 302d25f..4c66861 100644
--- a/conf/cqlshrc.sample
+++ b/conf/cqlshrc.sample
@@ -43,7 +43,7 @@ completekey = tab
 ;browser =
 
 [cql]
-version = 3.1.5
+version = 3.2.1
 
 [connection]
 hostname = 127.0.0.1
@@ -68,3 +68,18 @@ max_trace_wait = 10.0
 
 
 ; vim: set ft=dosini :
+
+;; optional options for COPY TO and COPY FROM
+;[copy]
+;maxattempts=10
+;numprocesses=4
+
+;; optional options for COPY FROM
+;[copy-from]
+;chunksize=5000
+;ingestrate=50000
+
+;; optional options for COPY TO
+;[copy-to]
+;pagesize=2000
+;pagetimeout=20


Mime
View raw message