couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From woh...@apache.org
Subject [couchdb] 01/01: New couchup 1.x -> 2.x database migration tool
Date Sun, 23 Apr 2017 19:34:22 GMT
This is an automated email from the ASF dual-hosted git repository.

wohali pushed a commit to branch feat-couchup
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b05b17225e5de0b6d979c88b41a57edfade564dc
Author: Joan Touzet <joant@atypical.net>
AuthorDate: Wed Apr 19 17:26:56 2017 -0400

    New couchup 1.x -> 2.x database migration tool
    
    This commit adds a new Python-based database migration tool, couchup.
    It is intended to be used at the command-line on the server being
    upgraded, before bringing the node (or cluster) into service.
    
    couchup provides 4 subcommands to assist in the migration process:
    
    * list - lists all CouchDB 1.x databases
    * replicate - replicates one or more 1.x databases to CouchDB 2.x
    * rebuild - rebuilds one or more CouchDB 2.x views
    * delete - deletes one or more CouchDB 1.x databases
    
    A typical workflow for a single-node upgrade process would look like:
    
    ```sh
    $ couchdb list
    $ couchdb replicate -a
    $ couchdb rebuild -a
    $ couchdb delete -a
    ```
    
    A clustered upgrade process would be the same, but must be preceded by
    setting up all the nodes in the cluster first.
    
    Various optional arguments provide for admin login/password, overriding
    ports, quiet mode and so on.
    
    Of special note is that `couchup rebuild` supports an optional flag,
    `-f`, to filter deleted documents during the replication process.
    
    I struggled some with the naming convention. For those in the know, a
    '1.x database' is a node-local database appearing only on port 5986, and
    a '2.x database' is a clustered database appearing on port 5984, and in
    raw, sharded form on port 5986.
---
 rel/overlay/bin/couchup | 480 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 480 insertions(+)

diff --git a/rel/overlay/bin/couchup b/rel/overlay/bin/couchup
new file mode 100755
index 0000000..858ccc8
--- /dev/null
+++ b/rel/overlay/bin/couchup
@@ -0,0 +1,480 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import argparse
+import base64
+import json
+import textwrap
+import threading
+import time
+import sys
+try:
+    from urllib import quote
+except ImportError:
+    from urllib.parse import quote
+import requests
+try:
+    import progressbar
+    HAVE_BAR = True
+except ImportError:
+    HAVE_BAR = False
+
+def _tojson(req):
+    """Support requests v0.x as well as 1.x+"""
+    if requests.__version__[0] == '0':
+        return json.loads(req.content)
+    return req.json()
+
+def _args(args):
+    args = vars(args)
+    if args['password']:
+        args['creds'] = (args['login'], args['password'])
+    else:
+        args['creds'] = None
+    return args
+
+def _do_list(args):
+    port = str(args['local_port'])
+    req = requests.get('http://127.0.0.1:' + port + '/_all_dbs',
+        auth=args['creds'])
+    req.raise_for_status()
+    dbs = _tojson(req)
+    local_dbs = [x for x in dbs if "shards" not in x
+        and x not in ['_dbs', '_nodes']]
+    clustered_dbs = list(set(
+        [x.split('/')[2].split('.')[0] for x in dbs if "shards" in x]
+    ))
+    if not args['include_system_dbs']:
+        # list comprehension to eliminate dbs starting with underscore
+        local_dbs = [x for x in local_dbs if x[0] != '_']
+        clustered_dbs = [x for x in clustered_dbs if x[0] != '_']
+    local_dbs.sort()
+    clustered_dbs.sort()
+    if args.get('clustered'):
+        return clustered_dbs
+    return local_dbs
+
+def _list(args):
+    args = _args(args)
+    ret = _do_list(args)
+    print(", ".join(ret))
+
+def _watch_replication(db,
+        local_port=5986,
+        clustered_port=5984,
+        creds=None,
+        hide_progress_bar=False,
+        quiet=False,
+        timeout=30):
+    """Watches replication, optionally with a progressbar."""
+    time.sleep(1)
+    if not quiet:
+        print("Replication started.")
+    url = "http://127.0.0.1:{}/{}".format(local_port, db)
+    try:
+        req = requests.get(url, auth=creds)
+        req.raise_for_status()
+        req = _tojson(req)
+        # here, local means node-local, i.e. source (1.x) database
+        local_docs = req['doc_count']
+        local_size = req['data_size']
+    except requests.exceptions.HTTPError:
+        raise Exception('Cannot retrieve {} doc_count!'.format(db))
+    if local_size == 0:
+        return
+    if HAVE_BAR and not hide_progress_bar and not quiet:
+        widgets = [
+            db,
+            ' ', progressbar.Percentage(),
+            ' ', progressbar.Bar(marker=progressbar.RotatingMarker()),
+            ' ', progressbar.ETA(),
+            ' ', progressbar.FileTransferSpeed(),
+        ]
+        progbar = progressbar.ProgressBar(widgets=widgets,
+                maxval=local_size).start()
+    count = 0
+    stall_count = 0
+    url = "http://127.0.0.1:{}/{}".format(clustered_port, db)
+    while count < local_docs:
+        try:
+            req = requests.get(url, auth=creds)
+            req.raise_for_status()
+            req = _tojson(req)
+            # here, cluster means clustered port, i.e. port 5984
+            clus_count = req['doc_count']
+            clus_size = req['data_size']
+        except requests.exceptions.HTTPError as exc:
+            if exc.response.status_code == 404:
+                clus_count = 0
+                clus_size = 0
+            else:
+                raise Exception('Cannot retrieve {} doc_count!'.format(db))
+        if count == clus_count:
+            stall_count += 1
+        else:
+            stall_count = 0
+        if stall_count == timeout:
+            if not quiet:
+                print(
+                    "Replication is stalled. Increase timeout or reduce load.")
+            exit(1)
+        if HAVE_BAR and not hide_progress_bar and not quiet:
+            if clus_size > local_size:
+                clus_size = local_size
+            progbar.update(clus_size)
+        count = clus_count
+        time.sleep(1)
+    if HAVE_BAR and not hide_progress_bar and not quiet:
+        progbar.finish()
+    return 0
+
+def _put_filter(args, db=None):
+    """Adds _design/repl_filters tombstone replication filter to DB."""
+    ddoc = {
+        '_id': '_design/repl_filters',
+        'filters': {
+            'no_deleted': 'function(doc,req){return !doc._deleted;};'
+        }
+    }
+    try:
+        req = requests.get(
+            'http://127.0.0.1:{}/{}/_design/repl_filters'.format(
+            args['local_port'], db),
+            auth=args['creds'])
+        req.raise_for_status()
+        doc = _tojson(req)
+        del doc['_rev']
+        if doc != ddoc:
+            if not args['quiet']:
+                print('Source replication filter does not match! Aborting.')
+            exit(1)
+    except requests.exceptions.HTTPError as exc:
+        if exc.response.status_code == 404:
+            if not args['quiet']:
+                print('Adding replication filter to source database...')
+            req = requests.put(
+                'http://127.0.0.1:{}/{}/_design/repl_filters'.format(
+                args['local_port'], db),
+                data=json.dumps(ddoc),
+                auth=args['creds'])
+            req.raise_for_status()
+        elif not args['quiet']:
+            print(exc.response.text)
+            exit(1)
+
+def _replicate(args):
+    args = _args(args)
+    if args['all_dbs']:
+        dbs = _do_list(args)
+    else:
+        dbs = args['dbs']
+
+    for db in dbs:
+        if args['filter_deleted']:
+            _put_filter(args, db)
+
+        if not args['quiet']:
+            print('Starting replication for ' + db + '...')
+        db = quote(db, safe='')
+        doc = {
+            'continuous': False,
+            'create_target': True,
+            'source': {
+                'url': 'http://127.0.0.1:{}/{}'.format(
+                    args['local_port'], db)
+            },
+            'target': {
+                'url': 'http://127.0.0.1:{}/{}'.format(
+                    args['clustered_port'], db)
+            }
+        }
+        if args['filter_deleted']:
+            doc['filter'] = 'repl_filters/no_deleted'
+        if args['creds']:
+            auth = 'Basic ' + base64.b64encode(':'.join(args['creds']))
+            headers = {
+                'authorization': auth
+            }
+            doc['source']['headers'] = headers
+            doc['target']['headers'] = headers
+        watch_args = {y: args[y] for y in [
+            'local_port', 'clustered_port', 'creds', 'hide_progress_bar',
+            'timeout', 'quiet']}
+        watch_args['db'] = db
+        watch = threading.Thread(target=_watch_replication, kwargs=watch_args)
+        watch.start()
+        try:
+            req = requests.post('http://127.0.0.1:{}/_replicate'.format(
+                args['clustered_port']),
+                auth=args['creds'],
+                data=json.dumps(doc),
+                headers={'Content-type': 'application/json'})
+            req.raise_for_status()
+            req = _tojson(req)
+        except requests.exceptions.HTTPError as exc:
+            if not args['quiet']:
+                print(exc.response.text)
+            exit(1)
+        watch.join()
+        if req.get('no_changes'):
+            if not args['quiet']:
+                print("No changes, replication is caught up.")
+        if not args['quiet']:
+            print("Replication complete.")
+
+def _rebuild(args):
+    args = _args(args)
+    if args['all_dbs']:
+        if args['views']:
+            if not args['quiet']:
+                print("Cannot take list of views for more than 1 database.")
+            exit(1)
+        args['clustered'] = True
+        dbs = _do_list(args)
+    else:
+        dbs = [args['db']]
+    for db in dbs:
+        if args['views']:
+            views = args['views']
+        else:
+            try:
+                req = requests.get('http://127.0.0.1:{}/{}/_all_docs'.format(
+                    args['clustered_port'], db),
+                    params={
+                        'start_key': '"_design/"',
+                        'end_key': '"_design0"'
+                    },
+                    auth=args['creds'])
+                req.raise_for_status()
+                req = _tojson(req)
+            except requests.exceptions.HTTPError as exc:
+                if not args['quiet']:
+                    print(exc.response.text)
+                exit(1)
+            req = req['rows']
+            ddocs = [x['id'].split('/')[1] for x in req]
+        for ddoc in ddocs:
+            try:
+                req = requests.get('http://127.0.0.1:{}/{}/_design/{}'.format(
+                    args['clustered_port'], db, ddoc),
+                    auth=args['creds'])
+                req.raise_for_status()
+                doc = _tojson(req)
+            except requests.exceptions.HTTPError as exc:
+                if not args['quiet']:
+                    print(exc.response.text)
+                exit(1)
+            if 'views' not in doc:
+                if not args['quiet']:
+                    print("Skipping {}/{}, no views found".format(db, ddoc))
+                    continue
+            # only need to refresh a single view per ddoc
+            if not args['quiet']:
+                print("Refreshing views in {}/{}...".format(db, ddoc))
+            view = list(doc['views'].keys())[0]
+            try:
+                req = requests.get(
+                    'http://127.0.0.1:{}/{}/_design/{}/_view/{}'.format(
+                        args['clustered_port'], db, ddoc, view),
+                    params={'limit': 1},
+                    auth=args['creds'],
+                    timeout=args['timeout'])
+            except requests.exceptions.Timeout:
+                if not args['quiet']:
+                    print("Timeout, view is processing. Moving on.")
+            except requests.exceptions.HTTPError as exc:
+                if not args['quiet']:
+                    print(exc.response.text)
+                exit(1)
+
+def _delete(args):
+    args = _args(args)
+    if args['all_dbs']:
+        args['include_system_dbs'] = False
+        dbs = _do_list(args)
+    else:
+        dbs = args['dbs']
+    for db in dbs:
+        db = quote(db, safe='')
+        local_url = 'http://127.0.0.1:{}/{}'.format(args['local_port'], db)
+        clus_url = 'http://127.0.0.1:{}/{}'.format(args['clustered_port'], db)
+        try:
+            req = requests.get(local_url, auth=args['creds'])
+            req.raise_for_status()
+            req = _tojson(req)
+            local_docs = req['doc_count']
+            req = requests.get(clus_url, auth=args['creds'])
+            req.raise_for_status()
+            req = _tojson(req)
+            clus_docs = req['doc_count']
+            if clus_docs < local_docs and not args['force']:
+                if not args['quiet']:
+                    print('Clustered DB has less docs than local version!' +
+                            ' Skipping...')
+                    continue
+            if not args['quiet']:
+                print('Deleting ' + db + '...')
+            req = requests.delete('http://127.0.0.1:{}/{}'.format(
+                args['local_port'], db),
+                auth=args['creds'])
+            req.raise_for_status()
+        except requests.exceptions.HTTPError as exc:
+            if not args['quiet']:
+                print(exc.response.text)
+            exit(1)
+
+def main(argv):
+    """Kindly do the needful."""
+    parser = argparse.ArgumentParser(prog='couchup',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        description=textwrap.dedent('''\
+            Migrate CouchDB 1.x databases to CouchDB 2.x.
+
+            Specify a subcommand and -h or --help for more help.
+         '''))
+
+    subparsers = parser.add_subparsers()
+
+    parser_list = subparsers.add_parser('list',
+        help='lists all CouchDB 1.x databases',
+        formatter_class=argparse.RawTextHelpFormatter,
+        description=textwrap.dedent('''\
+            Examples:
+              couchup list
+              couchup list -c -i -p mysecretpassword
+         '''))
+    parser_list.add_argument('-c', '--clustered', action='store_true',
+        help='show clustered (2.x) databases instead')
+    parser_list.add_argument('-i', '--include-system-dbs',
+        action='store_true',
+        help='include system databases (_users, _replicator, etc.)')
+    parser_list.add_argument('-l', '--login', default='admin',
+        help='specify login (default admin)')
+    parser_list.add_argument('-p', '--password',
+        help='specify password')
+    parser_list.add_argument('--local-port', default=5986,
+        help='override local port (default 5986)')
+    parser_list.add_argument('--clustered-port', default=5984,
+        help='override clustered port (default 5984)')
+    parser_list.set_defaults(func=_list)
+
+    parser_replicate = subparsers.add_parser('replicate',
+        help='replicates one or more 1.x databases to CouchDB 2.x',
+        formatter_class=argparse.RawTextHelpFormatter,
+        description=textwrap.dedent('''\
+            Examples:
+              couchup replicate movies
+              couchup replicate -f lots_of_deleted_docs_db
+              couchup replicate -i -q -n _users
+
+            Note:
+              The -f/--filter-deleted option adds a replication filter
+              to the source database, _design/repl_filters, that
+              is used during replication to filter out deleted
+              documents. This can greatly reduce the size of your
+              2.x database if there are many deleted documents.
+
+              It is IMPORTANT that no documents be deleted from the 1.x
+              database during this process, or those deletions may not
+              successfully replicate to the 2.x database.
+         '''))
+    parser_replicate.add_argument('-a', '--all_dbs', action='store_true',
+        help='act on all databases available')
+    parser_replicate.add_argument('-i', '--include-system-dbs',
+        action='store_true',
+        help='include system databases (_users, _replicator, etc.)')
+    parser_replicate.add_argument('-q', '--quiet', action='store_true',
+        help='suppress all output')
+    parser_replicate.add_argument('-n', '--hide-progress-bar',
+        action='store_true',
+        help='suppress progress bar display')
+    parser_replicate.add_argument('-f', '--filter-deleted',
+        action='store_true',
+        help='filter deleted document tombstones during replication')
+    parser_replicate.add_argument('-t', '--timeout', default=30,
+         help='stalled replication timeout threshhold in s (def: 30)')
+    parser_replicate.add_argument('-l', '--login', default='admin',
+        help='specify login (default admin)')
+    parser_replicate.add_argument('-p', '--password',
+        help='specify password')
+    parser_replicate.add_argument('--local-port', default=5986,
+        help='override local port (default 5986)')
+    parser_replicate.add_argument('--clustered-port', default=5984,
+        help='override clustered port (default 5984)')
+    parser_replicate.add_argument('dbs', metavar='db', type=str, nargs="*",
+        help="database(s) to be processed")
+    parser_replicate.set_defaults(func=_replicate)
+
+    parser_rebuild = subparsers.add_parser('rebuild',
+        help='rebuilds one or more CouchDB 2.x views',
+        formatter_class=argparse.RawTextHelpFormatter,
+        description=textwrap.dedent('''\
+            Examples:
+              couchup rebuild movies
+              couchup rebuild movies by_name
+              couchup rebuild -a -q -p mysecretpassword
+         '''))
+    parser_rebuild.add_argument('-a', '--all-dbs', action='store_true',
+        help='act on all databases available')
+    parser_rebuild.add_argument('-q', '--quiet', action='store_true',
+        help='suppress all output')
+    parser_rebuild.add_argument('-t', '--timeout', default=5,
+        help='timeout for waiting for view rebuild in s (default: 5)')
+    parser_rebuild.add_argument('-i', '--include-system-dbs',
+        action='store_true',
+        help='include system databases (_users, _replicator, etc.)')
+    parser_rebuild.add_argument('-l', '--login', default='admin',
+        help='specify login (default admin)')
+    parser_rebuild.add_argument('-p', '--password',
+        help='specify password')
+    parser_rebuild.add_argument('--local-port', default=5986,
+        help='override local port (default 5986)')
+    parser_rebuild.add_argument('--clustered-port', default=5984,
+        help='override clustered port (default 5984)')
+    parser_rebuild.add_argument('db', metavar='db', type=str, nargs="?",
+        help="database to be processed")
+    parser_rebuild.add_argument('views', metavar='view', type=str, nargs="*",
+        help="view(s) to be processed (all by default)")
+    parser_rebuild.set_defaults(func=_rebuild)
+
+    parser_delete = subparsers.add_parser('delete',
+        help='deletes one or more CouchDB 1.x databases',
+        formatter_class=argparse.RawTextHelpFormatter,
+        description=textwrap.dedent('''\
+            Examples:
+              couchup delete movies
+              couchup delete -q -p mysecretpassword movies
+         '''))
+    parser_delete.add_argument('-a', '--all-dbs', action='store_true',
+        help='act on all databases available')
+    parser_delete.add_argument('-f', '--force', action='store_true',
+        help='force deletion even if 1.x and 2.x databases are not identical')
+    parser_delete.add_argument('-q', '--quiet', action='store_true',
+        help='suppress all output')
+    parser_delete.add_argument('-l', '--login', default='admin',
+        help='specify login (default admin)')
+    parser_delete.add_argument('-p', '--password',
+        help='specify password')
+    parser_delete.add_argument('--local-port', default=5986,
+        help='override local port (default 5986)')
+    parser_delete.add_argument('--clustered-port', default=5984,
+        help='override clustered port (default 5984)')
+    parser_delete.add_argument('dbs', metavar='db', type=str, nargs="*",
+        help="database(s) to be processed")
+    parser_delete.set_defaults(func=_delete)
+
+    args = parser.parse_args(argv[1:])
+    args.func(args)
+
+if __name__ == '__main__':
+    main(sys.argv)

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <commits@couchdb.apache.org>.

Mime
View raw message