usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [24/50] [abbrv] usergrid git commit: Initial checkin for Python Utilities and SDK
Date Mon, 01 Aug 2016 16:53:59 GMT
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32f9e55d/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py
----------------------------------------------------------------------
diff --git a/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py b/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py
new file mode 100644
index 0000000..30ecc26
--- /dev/null
+++ b/utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py
@@ -0,0 +1,2168 @@
+import os
+import uuid
+from Queue import Empty
+import argparse
+import json
+import logging
+import sys
+from multiprocessing import Queue, Process
+from sets import Set
+
+import time_uuid
+
+import datetime
+from cloghandler import ConcurrentRotatingFileHandler
+import requests
+import traceback
+import redis
+import time
+from sys import platform as _platform
+
+import signal
+
+from requests.auth import HTTPBasicAuth
+from usergrid import UsergridQueryIterator
+import urllib3
+
+__author__ = 'Jeff West @ ApigeeCorporation'
+
+ECID = str(uuid.uuid1())
+key_version = 'v4'
+
+logger = logging.getLogger('GraphMigrator')
+worker_logger = logging.getLogger('Worker')
+collection_worker_logger = logging.getLogger('CollectionWorker')
+error_logger = logging.getLogger('ErrorLogger')
+audit_logger = logging.getLogger('AuditLogger')
+status_logger = logging.getLogger('StatusLogger')
+
+urllib3.disable_warnings()
+
+DEFAULT_CREATE_APPS = False
+DEFAULT_RETRY_SLEEP = 10
+DEFAULT_PROCESSING_SLEEP = 1
+
+queue = Queue()
+QSIZE_OK = False
+
+try:
+    queue.qsize()
+    QSIZE_OK = True
+except:
+    pass
+
+session_source = requests.Session()
+session_target = requests.Session()
+
+cache = None
+
+
+def total_seconds(td):
+    return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6
+
+
+def init_logging(stdout_enabled=True):
+    root_logger = logging.getLogger()
+    root_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO')))
+
+    # root_logger.setLevel(logging.WARN)
+
+    logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.ERROR)
+    logging.getLogger('boto').setLevel(logging.ERROR)
+    logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
+
+    log_formatter = logging.Formatter(
+            fmt='%(asctime)s | ' + ECID + ' | %(name)s | %(processName)s | %(levelname)s | %(message)s',
+            datefmt='%m/%d/%Y %I:%M:%S %p')
+
+    stdout_logger = logging.StreamHandler(sys.stdout)
+    stdout_logger.setFormatter(log_formatter)
+    root_logger.addHandler(stdout_logger)
+
+    if stdout_enabled:
+        stdout_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO')))
+
+    # base log file
+
+    log_file_name = os.path.join(config.get('log_dir'),
+                                 '%s-%s-%s-migrator.log' % (config.get('org'), config.get('migrate'), ECID))
+
+    # ConcurrentRotatingFileHandler
+    rotating_file = ConcurrentRotatingFileHandler(filename=log_file_name,
+                                                  mode='a',
+                                                  maxBytes=404857600,
+                                                  backupCount=0)
+    rotating_file.setFormatter(log_formatter)
+    rotating_file.setLevel(logging.INFO)
+
+    root_logger.addHandler(rotating_file)
+    error_log_file_name = os.path.join(config.get('log_dir'), '%s-%s-%s-migrator-errors.log' % (
+        config.get('org'), config.get('migrate'), ECID))
+
+    error_rotating_file = ConcurrentRotatingFileHandler(filename=error_log_file_name,
+                                                        mode='a',
+                                                        maxBytes=404857600,
+                                                        backupCount=0)
+    error_rotating_file.setFormatter(log_formatter)
+    error_rotating_file.setLevel(logging.ERROR)
+
+    root_logger.addHandler(error_rotating_file)
+
+
+entity_name_map = {
+    'users': 'username'
+}
+
+config = {}
+
+# URL Templates for Usergrid
+org_management_app_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}"
+org_management_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}"
+org_url_template = "{api_url}/{org}?client_id={client_id}&client_secret={client_secret}"
+app_url_template = "{api_url}/{org}/{app}?client_id={client_id}&client_secret={client_secret}"
+collection_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}"
+collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&client_id={client_id}&client_secret={client_secret}&limit={limit}"
+collection_graph_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}&limit={limit}"
+connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}?client_id={client_id}&client_secret={client_secret}"
+connecting_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/connecting/{verb}?client_id={client_id}&client_secret={client_secret}"
+connection_create_by_uuid_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}?client_id={client_id}&client_secret={client_secret}"
+connection_create_by_name_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_type}/{target_name}?client_id={client_id}&client_secret={client_secret}"
+
+connection_create_by_pairs_url_template = "{api_url}/{org}/{app}/{source_type_id}/{verb}/{target_type_id}?client_id={client_id}&client_secret={client_secret}"
+
+get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}&connections=none"
+get_entity_url_with_connections_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}"
+put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}"
+permissions_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/permissions?client_id={client_id}&client_secret={client_secret}"
+
+user_credentials_url_template = "{api_url}/{org}/{app}/users/{uuid}/credentials"
+
+ignore_collections = ['activities', 'queues', 'events', 'notifications']
+
+
+class StatusListener(Process):
+    def __init__(self, status_queue, worker_queue):
+        super(StatusListener, self).__init__()
+        self.status_queue = status_queue
+        self.worker_queue = worker_queue
+
+    def run(self):
+        keep_going = True
+
+        org_results = {
+            'name': config.get('org'),
+            'apps': {},
+        }
+
+        empty_count = 0
+
+        status_file_name = os.path.join(config.get('log_dir'),
+                                        '%s-%s-%s-status.json' % (config.get('org'), config.get('migrate'), ECID))
+
+        while keep_going:
+
+            try:
+                app, collection, status_map = self.status_queue.get(timeout=60)
+                status_logger.info('Received status update for app/collection: [%s / %s]' % (app, collection))
+                empty_count = 0
+                org_results['summary'] = {
+                    'max_created': -1,
+                    'max_modified': -1,
+                    'min_created': 1584946416000,
+                    'min_modified': 1584946416000,
+                    'count': 0,
+                    'bytes': 0
+                }
+
+                if app not in org_results['apps']:
+                    org_results['apps'][app] = {
+                        'collections': {}
+                    }
+
+                org_results['apps'][app]['collections'].update(status_map)
+
+                try:
+                    for app, app_data in org_results['apps'].iteritems():
+                        app_data['summary'] = {
+                            'max_created': -1,
+                            'max_modified': -1,
+                            'min_created': 1584946416000,
+                            'min_modified': 1584946416000,
+                            'count': 0,
+                            'bytes': 0
+                        }
+
+                        if 'collections' in app_data:
+                            for collection, collection_data in app_data['collections'].iteritems():
+
+                                app_data['summary']['count'] += collection_data['count']
+                                app_data['summary']['bytes'] += collection_data['bytes']
+
+                                org_results['summary']['count'] += collection_data['count']
+                                org_results['summary']['bytes'] += collection_data['bytes']
+
+                                # APP
+                                if collection_data.get('max_modified') > app_data['summary']['max_modified']:
+                                    app_data['summary']['max_modified'] = collection_data.get('max_modified')
+
+                                if collection_data.get('min_modified') < app_data['summary']['min_modified']:
+                                    app_data['summary']['min_modified'] = collection_data.get('min_modified')
+
+                                if collection_data.get('max_created') > app_data['summary']['max_created']:
+                                    app_data['summary']['max_created'] = collection_data.get('max_created')
+
+                                if collection_data.get('min_created') < app_data['summary']['min_created']:
+                                    app_data['summary']['min_created'] = collection_data.get('min_created')
+
+                                # ORG
+                                if collection_data.get('max_modified') > org_results['summary']['max_modified']:
+                                    org_results['summary']['max_modified'] = collection_data.get('max_modified')
+
+                                if collection_data.get('min_modified') < org_results['summary']['min_modified']:
+                                    org_results['summary']['min_modified'] = collection_data.get('min_modified')
+
+                                if collection_data.get('max_created') > org_results['summary']['max_created']:
+                                    org_results['summary']['max_created'] = collection_data.get('max_created')
+
+                                if collection_data.get('min_created') < org_results['summary']['min_created']:
+                                    org_results['summary']['min_created'] = collection_data.get('min_created')
+
+                        if QSIZE_OK:
+                            status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize())
+
+                        status_logger.warn('UPDATED status of org processed: %s' % json.dumps(org_results))
+
+                        try:
+                            logger.info('Writing status to file: %s' % status_file_name)
+
+                            with open(status_file_name, 'w') as f:
+                                json.dump(org_results, f, indent=2)
+                        except:
+                            print traceback.format_exc()
+
+                except KeyboardInterrupt, e:
+                    raise e
+
+                except:
+                    print traceback.format_exc()
+
+            except KeyboardInterrupt, e:
+                status_logger.warn('FINAL status of org processed: %s' % json.dumps(org_results))
+                raise e
+
+            except Empty:
+                if QSIZE_OK:
+                    status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize())
+
+                status_logger.warn('CURRENT status of org processed: %s' % json.dumps(org_results))
+
+                status_logger.warning('EMPTY! Count=%s' % empty_count)
+
+                empty_count += 1
+
+                if empty_count >= 120:
+                    keep_going = False
+
+            except:
+                print traceback.format_exc()
+
+        logger.warn('FINAL status of org processed: %s' % json.dumps(org_results))
+
+        try:
+            logger.info('Writing final status to file: %s' % status_file_name)
+            with open(status_file_name, 'w') as f:
+                json.dump(org_results, f, indent=2)
+        except:
+            print traceback.format_exc()
+
+
+class EntityWorker(Process):
+    def __init__(self, queue, handler_function):
+        super(EntityWorker, self).__init__()
+
+        worker_logger.debug('Creating worker!')
+        self.queue = queue
+        self.handler_function = handler_function
+
+    def run(self):
+
+        worker_logger.info('starting run()...')
+        keep_going = True
+
+        count_processed = 0
+        empty_count = 0
+        start_time = int(time.time())
+
+        while keep_going:
+
+            try:
+                # get an entity with the app and collection name
+                app, collection_name, entity = self.queue.get(timeout=120)
+                empty_count = 0
+
+                # if entity.get('type') == 'user':
+                #     entity = confirm_user_entity(app, entity)
+
+                # the handler operation is the specified operation such as migrate_graph
+                if self.handler_function is not None:
+                    try:
+                        message_start_time = int(time.time())
+                        processed = self.handler_function(app, collection_name, entity)
+                        message_end_time = int(time.time())
+
+                        if processed:
+                            count_processed += 1
+
+                            total_time = message_end_time - start_time
+                            avg_time_per_message = total_time / count_processed
+                            message_time = message_end_time - message_start_time
+
+                            worker_logger.debug('Processed [%sth] entity = %s / %s / %s' % (
+                                count_processed, app, collection_name, entity.get('uuid')))
+
+                            if count_processed % 1000 == 1:
+                                worker_logger.info(
+                                        'Processed [%sth] entity = [%s / %s / %s] in [%s]s - avg time/message [%s]' % (
+                                            count_processed, app, collection_name, entity.get('uuid'), message_time,
+                                            avg_time_per_message))
+
+                    except KeyboardInterrupt, e:
+                        raise e
+
+                    except Exception, e:
+                        logger.exception('Error in EntityWorker processing message')
+                        print traceback.format_exc()
+
+            except KeyboardInterrupt, e:
+                raise e
+
+            except Empty:
+                worker_logger.warning('EMPTY! Count=%s' % empty_count)
+
+                empty_count += 1
+
+                if empty_count >= 2:
+                    keep_going = False
+
+            except Exception, e:
+                logger.exception('Error in EntityWorker run()')
+                print traceback.format_exc()
+
+
+class CollectionWorker(Process):
+    def __init__(self, work_queue, entity_queue, response_queue):
+        super(CollectionWorker, self).__init__()
+        collection_worker_logger.debug('Creating worker!')
+        self.work_queue = work_queue
+        self.response_queue = response_queue
+        self.entity_queue = entity_queue
+
+    def run(self):
+
+        collection_worker_logger.info('starting run()...')
+        keep_going = True
+
+        counter = 0
+        # max_created = 0
+        empty_count = 0
+        app = 'ERROR'
+        collection_name = 'NOT SET'
+        status_map = {}
+        sleep_time = 10
+
+        try:
+
+            while keep_going:
+
+                try:
+                    app, collection_name = self.work_queue.get(timeout=30)
+
+                    status_map = {
+                        collection_name: {
+                            'iteration_started': str(datetime.datetime.now()),
+                            'max_created': -1,
+                            'max_modified': -1,
+                            'min_created': 1584946416000,
+                            'min_modified': 1584946416000,
+                            'count': 0,
+                            'bytes': 0
+                        }
+                    }
+
+                    empty_count = 0
+
+                    # added a flag for using graph vs query/index
+                    if config.get('graph', False):
+                        source_collection_url = collection_graph_url_template.format(org=config.get('org'),
+                                                                                     app=app,
+                                                                                     collection=collection_name,
+                                                                                     limit=config.get('limit'),
+                                                                                     **config.get('source_endpoint'))
+                    else:
+                        source_collection_url = collection_query_url_template.format(org=config.get('org'),
+                                                                                     app=app,
+                                                                                     collection=collection_name,
+                                                                                     limit=config.get('limit'),
+                                                                                     ql="select * %s" % config.get(
+                                                                                             'ql'),
+                                                                                     **config.get('source_endpoint'))
+
+                    logger.info('Iterating URL: %s' % source_collection_url)
+
+                    # use the UsergridQuery from the Python SDK to iterate the collection
+                    q = UsergridQueryIterator(source_collection_url,
+                                              page_delay=config.get('page_sleep_time'),
+                                              sleep_time=config.get('error_retry_sleep'))
+
+                    for entity in q:
+
+                        # begin entity loop
+
+                        self.entity_queue.put((app, collection_name, entity))
+                        counter += 1
+
+                        if 'created' in entity:
+
+                            try:
+                                entity_created = long(entity.get('created'))
+
+                                if entity_created > status_map[collection_name]['max_created']:
+                                    status_map[collection_name]['max_created'] = entity_created
+                                    status_map[collection_name]['max_created_str'] = str(
+                                            datetime.datetime.fromtimestamp(entity_created / 1000))
+
+                                if entity_created < status_map[collection_name]['min_created']:
+                                    status_map[collection_name]['min_created'] = entity_created
+                                    status_map[collection_name]['min_created_str'] = str(
+                                            datetime.datetime.fromtimestamp(entity_created / 1000))
+
+                            except ValueError:
+                                pass
+
+                        if 'modified' in entity:
+
+                            try:
+                                entity_modified = long(entity.get('modified'))
+
+                                if entity_modified > status_map[collection_name]['max_modified']:
+                                    status_map[collection_name]['max_modified'] = entity_modified
+                                    status_map[collection_name]['max_modified_str'] = str(
+                                            datetime.datetime.fromtimestamp(entity_modified / 1000))
+
+                                if entity_modified < status_map[collection_name]['min_modified']:
+                                    status_map[collection_name]['min_modified'] = entity_modified
+                                    status_map[collection_name]['min_modified_str'] = str(
+                                            datetime.datetime.fromtimestamp(entity_modified / 1000))
+
+                            except ValueError:
+                                pass
+
+                        status_map[collection_name]['bytes'] += count_bytes(entity)
+                        status_map[collection_name]['count'] += 1
+
+                        if counter % 1000 == 1:
+                            try:
+                                collection_worker_logger.warning(
+                                        'Sending stats for app/collection [%s / %s]: %s' % (
+                                            app, collection_name, status_map))
+
+                                self.response_queue.put((app, collection_name, status_map))
+
+                                if QSIZE_OK:
+                                    collection_worker_logger.info(
+                                            'Counter=%s, collection queue depth=%s' % (
+                                                counter, self.work_queue.qsize()))
+                            except:
+                                pass
+
+                            collection_worker_logger.warn(
+                                    'Current status of collections processed: %s' % json.dumps(status_map))
+
+                        if config.get('entity_sleep_time') > 0:
+                            collection_worker_logger.debug(
+                                    'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
+                            time.sleep(config.get('entity_sleep_time'))
+                            collection_worker_logger.debug(
+                                    'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
+
+                    # end entity loop
+
+                    status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now())
+
+                    collection_worker_logger.warning(
+                            'Collection [%s / %s / %s] loop complete!  Max Created entity %s' % (
+                                config.get('org'), app, collection_name, status_map[collection_name]['max_created']))
+
+                    collection_worker_logger.warning(
+                            'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map))
+
+                    self.response_queue.put((app, collection_name, status_map))
+
+                    collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name))
+
+                except KeyboardInterrupt, e:
+                    raise e
+
+                except Empty:
+                    collection_worker_logger.warning('EMPTY! Count=%s' % empty_count)
+
+                    empty_count += 1
+
+                    if empty_count >= 2:
+                        keep_going = False
+
+                except Exception, e:
+                    logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name)
+                    print traceback.format_exc()
+
+        finally:
+            self.response_queue.put((app, collection_name, status_map))
+            collection_worker_logger.info('FINISHED!')
+
+
+def use_name_for_collection(collection_name):
+    return collection_name in config.get('use_name_for_collection', [])
+
+
+def include_edge(collection_name, edge_name):
+    include_edges = config.get('include_edge', [])
+
+    if include_edges is None:
+        include_edges = []
+
+    exclude_edges = config.get('exclude_edge', [])
+
+    if exclude_edges is None:
+        exclude_edges = []
+
+    if len(include_edges) > 0 and edge_name not in include_edges:
+        logger.debug(
+                'Skipping edge [%s] since it is not in INCLUDED list: %s' % (edge_name, include_edges))
+        return False
+
+    if edge_name in exclude_edges:
+        logger.debug(
+                'Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges))
+        return False
+
+    if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \
+            or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']):
+        # feed and activities are not retrievable...
+        # roles and groups will be more efficiently handled from the role/group -> user
+        # followers will be handled by 'following'
+        # do only this from user -> device
+        return False
+
+    return True
+
+
+def exclude_edge(collection_name, edge_name):
+    exclude_edges = config.get('exclude_edge', [])
+
+    if exclude_edges is None:
+        exclude_edges = []
+
+    if edge_name in exclude_edges:
+        logger.debug('Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges))
+        return True
+
+    if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \
+            or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']):
+        # feed and activities are not retrievable...
+        # roles and groups will be more efficiently handled from the role/group -> user
+        # followers will be handled by 'following'
+        # do only this from user -> device
+        return True
+
+    return False
+
+
+def confirm_user_entity(app, source_entity, attempts=0):
+    attempts += 1
+
+    source_entity_url = get_entity_url_template.format(org=config.get('org'),
+                                                       app=app,
+                                                       collection='users',
+                                                       uuid=source_entity.get('username'),
+                                                       **config.get('source_endpoint'))
+
+    if attempts >= 5:
+        logger.warning('Punting after [%s] attempts to confirm user at URL [%s], will use the source entity...' % (
+            attempts, source_entity_url))
+
+        return source_entity
+
+    r = requests.get(url=source_entity_url)
+
+    if r.status_code == 200:
+        retrieved_entity = r.json().get('entities')[0]
+
+        if retrieved_entity.get('uuid') != source_entity.get('uuid'):
+            logger.info(
+                    'UUID of Source Entity [%s] differs from uuid [%s] of retrieved entity at URL=[%s] and will be substituted' % (
+                        source_entity.get('uuid'), retrieved_entity.get('uuid'), source_entity_url))
+
+        return retrieved_entity
+
+    elif 'service_resource_not_found' in r.text:
+
+        logger.warn('Unable to retrieve user at URL [%s], and will use source entity.  status=[%s] response: %s...' % (
+            source_entity_url, r.status_code, r.text))
+
+        return source_entity
+
+    else:
+        logger.error('After [%s] attempts to confirm user at URL [%s], received status [%s] message: %s...' % (
+            attempts, source_entity_url, r.status_code, r.text))
+
+        time.sleep(DEFAULT_RETRY_SLEEP)
+
+        return confirm_user_entity(app, source_entity, attempts)
+
+
+def create_connection(app, collection_name, source_entity, edge_name, target_entity):
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    source_identifier = get_source_identifier(source_entity)
+    target_identifier = get_source_identifier(target_entity)
+
+    source_type_id = '%s/%s' % (source_entity.get('type'), source_identifier)
+    target_type_id = '%s/%s' % (target_entity.get('type'), target_identifier)
+
+    if source_entity.get('type') == 'user':
+        source_type_id = '%s/%s' % ('users', source_entity.get('username'))
+
+    if target_entity.get('type') == 'user':
+        if edge_name == 'users':
+            target_type_id = target_entity.get('uuid')
+        else:
+            target_type_id = '%s/%s' % ('users', target_entity.get('uuid'))
+
+    if target_entity.get('type') == 'device':
+        if edge_name == 'devices':
+            target_type_id = target_entity.get('uuid')
+        else:
+            target_type_id = '%s/%s' % ('devices', target_entity.get('uuid'))
+
+    if target_entity.get('type') == 'receipt':
+        if edge_name == 'receipts':
+            target_type_id = target_entity.get('uuid')
+        else:
+            target_type_id = '%s/%s' % ('receipts', target_entity.get('uuid'))
+
+    create_connection_url = connection_create_by_pairs_url_template.format(
+            org=target_org,
+            app=target_app,
+            source_type_id=source_type_id,
+            verb=edge_name,
+            target_type_id=target_type_id,
+            **config.get('target_endpoint'))
+
+    if not config.get('skip_cache_read', False):
+        processed = cache.get(create_connection_url)
+
+        if processed not in [None, 'None']:
+            logger.debug('Skipping visited Edge: [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % (
+                app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'),
+                target_entity.get('name'), create_connection_url))
+
+            return True
+
+    logger.info('Connecting entity [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % (
+        app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'),
+        target_entity.get('name', target_entity.get('uuid')), create_connection_url))
+
+    attempts = 0
+
+    while attempts < 5:
+        attempts += 1
+
+        r_create = session_target.post(create_connection_url)
+
+        if r_create.status_code == 200:
+
+            if not config.get('skip_cache_write', False):
+                cache.set(create_connection_url, 1)
+
+            return True
+        else:
+            if r_create.status_code >= 500:
+
+                if attempts < 5:
+                    logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % (
+                        r_create.status_code, create_connection_url, r_create.text))
+                    time.sleep(DEFAULT_RETRY_SLEEP)
+                else:
+                    logger.critical(
+                            'FAILED [%s] (WILL NOT RETRY - max attempts) to create connection at URL=[%s]: %s' % (
+                                r_create.status_code, create_connection_url, r_create.text))
+                    return False
+
+            elif r_create.status_code in [401, 404]:
+
+                if config.get('repair_data', False):
+                    logger.warning('FAILED [%s] (WILL attempt repair) to create connection at URL=[%s]: %s' % (
+                        r_create.status_code, create_connection_url, r_create.text))
+                    migrate_data(app, source_entity.get('type'), source_entity, force=True)
+                    migrate_data(app, target_entity.get('type'), target_entity, force=True)
+
+                else:
+                    logger.critical('FAILED [%s] (WILL NOT attempt repair) to create connection at URL=[%s]: %s' % (
+                        r_create.status_code, create_connection_url, r_create.text))
+
+            else:
+                logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % (
+                    r_create.status_code, create_connection_url, r_create.text))
+
+    return False
+
+
+def process_edges(app, collection_name, source_entity, edge_name, connection_stack):
+
+    source_identifier = get_source_identifier(source_entity)
+
+    while len(connection_stack) > 0:
+
+        target_entity = connection_stack.pop()
+
+        if exclude_collection(collection_name) or exclude_collection(target_entity.get('type')):
+            logger.debug('EXCLUDING Edge (collection): [%s / %s / %s] --[%s]--> ?' % (
+                app, collection_name, source_identifier, edge_name ))
+            continue
+
+        create_connection(app, collection_name, source_entity, edge_name, target_entity)
+
+
+def migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0):
+    if not include_edge(collection_name, edge_name):
+        return True
+
+    source_uuid = source_entity.get('uuid')
+
+    key = '%s:edge:out:%s:%s' % (key_version, source_uuid, edge_name)
+
+    if not config.get('skip_cache_read', False):
+        date_visited = cache.get(key)
+
+        if date_visited not in [None, 'None']:
+            logger.info('Skipping EDGE [%s / %s --%s-->] - visited at %s' % (
+                collection_name, source_uuid, edge_name, date_visited))
+            return True
+        else:
+            cache.delete(key)
+
+    if not config.get('skip_cache_write', False):
+        cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
+
+    logger.debug('Visiting EDGE [%s / %s (%s) --%s-->] at %s' % (
+        collection_name, source_uuid, get_uuid_time(source_uuid), edge_name, str(datetime.datetime.utcnow())))
+
+    response = True
+
+    source_identifier = get_source_identifier(source_entity)
+
+    count_edges = 0
+
+    logger.debug(
+            'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier))
+
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    connection_query_url = connection_query_url_template.format(
+            org=config.get('org'),
+            app=app,
+            verb=edge_name,
+            collection=collection_name,
+            uuid=source_identifier,
+            limit=config.get('limit'),
+            **config.get('source_endpoint'))
+
+    connection_query = UsergridQueryIterator(connection_query_url, sleep_time=config.get('error_retry_sleep'))
+
+    connection_stack = []
+
+    for target_entity in connection_query:
+        target_connection_collection = config.get('collection_mapping', {}).get(target_entity.get('type'),
+                                                                                target_entity.get('type'))
+
+        target_ok = migrate_graph(app, target_entity.get('type'), source_entity=target_entity, depth=depth)
+
+        if not target_ok:
+            logger.critical(
+                    'Error migrating TARGET entity data for connection [%s / %s / %s] --[%s]--> [%s / %s / %s]' % (
+                        app, collection_name, source_identifier, edge_name, app, target_connection_collection,
+                        target_entity.get('name', target_entity.get('uuid'))))
+
+        count_edges += 1
+        connection_stack.append(target_entity)
+
+    process_edges(app, collection_name, source_entity, edge_name, connection_stack)
+
+    return response
+
+
+def get_source_identifier(source_entity):
+    entity_type = source_entity.get('type')
+
+    source_identifier = source_entity.get('uuid')
+
+    if use_name_for_collection(entity_type):
+
+        if entity_type in ['user']:
+            source_identifier = source_entity.get('username')
+        else:
+            source_identifier = source_entity.get('name')
+
+        if source_identifier is None:
+            source_identifier = source_entity.get('uuid')
+            logger.warn('Using UUID for entity [%s / %s]' % (entity_type, source_identifier))
+
+    return source_identifier
+
+
+def include_collection(collection_name):
+    if collection_name in ['events']:
+        return False
+
+    include = config.get('collection', [])
+
+    if include is not None and len(include) > 0 and collection_name not in include:
+        return False
+
+    exclude = config.get('exclude_collection', [])
+
+    if exclude is not None and collection_name in exclude:
+        return False
+
+    return True
+
+
+def exclude_collection(collection_name):
+    exclude = config.get('exclude_collection', [])
+
+    if exclude is not None and collection_name in exclude:
+        return True
+
+    return False
+
+
+def migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0):
+    source_uuid = source_entity.get('uuid')
+    key = '%s:edges:in:%s:%s' % (key_version, source_uuid, edge_name)
+
+    if not config.get('skip_cache_read', False):
+        date_visited = cache.get(key)
+
+        if date_visited not in [None, 'None']:
+            logger.info('Skipping EDGE [--%s--> %s / %s] - visited at %s' % (
+                collection_name, source_uuid, edge_name, date_visited))
+            return True
+        else:
+            cache.delete(key)
+
+    if not config.get('skip_cache_write', False):
+        cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
+
+    logger.debug('Visiting EDGE [--%s--> %s / %s (%s)] at %s' % (
+        edge_name, collection_name, source_uuid, get_uuid_time(source_uuid), str(datetime.datetime.utcnow())))
+
+    source_identifier = get_source_identifier(source_entity)
+
+    if exclude_collection(collection_name):
+        logger.debug('Excluding (Collection) entity [%s / %s / %s]' % (app, collection_name, source_uuid))
+        return True
+
+    if not include_edge(collection_name, edge_name):
+        return True
+
+    logger.debug(
+            'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier))
+
+    logger.debug('Processing IN edges type=[%s] of entity [ %s / %s / %s]' % (
+        edge_name, app, collection_name, source_uuid))
+
+    connecting_query_url = connecting_query_url_template.format(
+            org=config.get('org'),
+            app=app,
+            collection=collection_name,
+            uuid=source_uuid,
+            verb=edge_name,
+            limit=config.get('limit'),
+            **config.get('source_endpoint'))
+
+    connection_query = UsergridQueryIterator(connecting_query_url, sleep_time=config.get('error_retry_sleep'))
+
+    response = True
+
+    for e_connection in connection_query:
+        logger.debug('Triggering IN->OUT edge migration on entity [%s / %s / %s] ' % (
+            app, e_connection.get('type'), e_connection.get('uuid')))
+
+        response = migrate_graph(app, e_connection.get('type'), e_connection, depth) and response
+
+    return response
+
+
+def migrate_graph(app, collection_name, source_entity, depth=0):
+    depth += 1
+    source_uuid = source_entity.get('uuid')
+
+    # short circuit if the graph depth exceeds what was specified
+    if depth > config.get('graph_depth', 1):
+        logger.debug(
+                'Reached Max Graph Depth, stopping after [%s] on [%s / %s]' % (depth, collection_name, source_uuid))
+        return True
+    else:
+        logger.debug('Processing @ Graph Depth [%s]' % depth)
+
+    if exclude_collection(collection_name):
+        logger.warn('Ignoring entity in filtered collection [%s]' % collection_name)
+        return True
+
+    key = '%s:graph:%s' % (key_version, source_uuid)
+    entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
+
+    if not config.get('skip_cache_read', False):
+        date_visited = cache.get(key)
+
+        if date_visited not in [None, 'None']:
+            logger.debug('Skipping GRAPH %s at %s' % (entity_tag, date_visited))
+            return True
+        else:
+            cache.delete(key)
+
+    logger.info('Visiting GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow())))
+
+    if not config.get('skip_cache_write', False):
+        cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
+
+    # first, migrate data for current node
+    response = migrate_data(app, collection_name, source_entity)
+
+    # gather the outbound edge names
+    out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])]
+    out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])]
+
+    logger.debug('Entity %s has [%s] OUT edges' % (entity_tag, len(out_edge_names)))
+
+    # migrate each outbound edge type
+    for edge_name in out_edge_names:
+
+        if not exclude_edge(collection_name, edge_name):
+            response = migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth) and response
+
+        if config.get('prune', False):
+            prune_edge_by_name(edge_name, app, collection_name, source_entity)
+
+    # gather the inbound edge names
+    in_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('connecting', [])]
+
+    logger.debug('Entity %s has [%s] IN edges' % (entity_tag, len(in_edge_names)))
+
+    # migrate each inbound edge type
+    for edge_name in in_edge_names:
+
+        if not exclude_edge(collection_name, edge_name):
+            response = migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name,
+                                                  depth) and response
+
+    return response
+
+
+def collect_entities(q):
+    response = {}
+
+    for e in q:
+        response[e.get('uuid')] = e
+
+    return response
+
+
+def prune_edge_by_name(edge_name, app, collection_name, source_entity):
+    if not include_edge(collection_name, edge_name):
+        return True
+
+    source_identifier = get_source_identifier(source_entity)
+    source_uuid = source_entity.get('uuid')
+
+    entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
+
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    target_connection_query_url = connection_query_url_template.format(
+            org=target_org,
+            app=target_app,
+            verb=edge_name,
+            collection=target_collection,
+            uuid=source_identifier,
+            limit=config.get('limit'),
+            **config.get('target_endpoint'))
+
+    source_connection_query_url = connection_query_url_template.format(
+            org=config.get('org'),
+            app=app,
+            verb=edge_name,
+            collection=collection_name,
+            uuid=source_identifier,
+            limit=config.get('limit'),
+            **config.get('source_endpoint'))
+
+    source_connections = collect_entities(
+            UsergridQueryIterator(source_connection_query_url, sleep_time=config.get('error_retry_sleep')))
+
+    target_connections = collect_entities(
+            UsergridQueryIterator(target_connection_query_url, sleep_time=config.get('error_retry_sleep')))
+
+    delete_uuids = Set(target_connections.keys()) - Set(source_connections.keys())
+
+    if len(delete_uuids) > 0:
+        logger.info('Found [%s] edges to delete for entity %s' % (len(delete_uuids), entity_tag))
+
+        for delete_uuid in delete_uuids:
+            delete_connection_url = connection_create_by_uuid_url_template.format(
+                    org=target_org,
+                    app=target_app,
+                    verb=edge_name,
+                    collection=target_collection,
+                    uuid=source_identifier,
+                    target_uuid=delete_uuid,
+                    **config.get('target_endpoint'))
+
+            attempts = 0
+
+            while attempts < 5:
+                attempts += 1
+
+                r = session_target.delete(delete_connection_url)
+
+                if not config.get('skip_cache_write'):
+                    cache.delete(delete_connection_url)
+
+                if r.status_code == 200:
+                    logger.info('Pruned edge on attempt [%s] URL=[%s]' % (attempts, delete_connection_url))
+                    break
+                else:
+                    logger.error('Error [%s] on attempt [%s] deleting connection at URL=[%s]: %s' % (
+                        r.status_code, attempts, delete_connection_url, r.text))
+                    time.sleep(DEFAULT_RETRY_SLEEP)
+
+    return True
+
+
+def prune_graph(app, collection_name, source_entity):
+    source_uuid = source_entity.get('uuid')
+    key = '%s:prune_graph:%s' % (key_version, source_uuid)
+    entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
+
+    if not config.get('skip_cache_read', False):
+        date_visited = cache.get(key)
+
+        if date_visited not in [None, 'None']:
+            logger.debug('Skipping PRUNE %s at %s' % (entity_tag, date_visited))
+            return True
+        else:
+            cache.delete(key)
+
+    logger.debug('pruning GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow())))
+    if not config.get('skip_cache_write', False):
+        cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
+
+    if collection_name in config.get('exclude_collection', []):
+        logger.debug('Excluding (Collection) entity %s' % entity_tag)
+        return True
+
+    out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])]
+    out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])]
+
+    for edge_name in out_edge_names:
+        prune_edge_by_name(edge_name, app, collection_name, source_entity)
+
+
+def reput(app, collection_name, source_entity, attempts=0):
+    source_identifier = source_entity.get('uuid')
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    try:
+        target_entity_url_by_name = put_entity_url_template.format(org=target_org,
+                                                                   app=target_app,
+                                                                   collection=target_collection,
+                                                                   uuid=source_identifier,
+                                                                   **config.get('target_endpoint'))
+
+        r = session_source.put(target_entity_url_by_name, data=json.dumps({}))
+        if r.status_code != 200:
+            logger.info('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code))
+        else:
+            logger.debug('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code))
+
+    except:
+        pass
+
+
+def get_uuid_time(the_uuid_string):
+    return time_uuid.TimeUUID(the_uuid_string).get_datetime()
+
+
+def migrate_permissions(app, collection_name, source_entity, attempts=0):
+    if collection_name not in ['roles', 'role', 'group', 'groups']:
+        return True
+
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    source_identifier = get_source_identifier(source_entity)
+
+    source_permissions_url = permissions_url_template.format(org=config.get('org'),
+                                                             app=app,
+                                                             collection=collection_name,
+                                                             uuid=source_identifier,
+                                                             **config.get('source_endpoint'))
+
+    r = session_source.get(source_permissions_url)
+
+    if r.status_code != 200:
+        logger.error('Unable to get permissions at URL [%s]: %s' % (source_permissions_url, r.text))
+        return False
+
+    perm_response = r.json()
+
+    perms = perm_response.get('data', [])
+
+    logger.info('Migrating [%s / %s] with permissions %s' % (collection_name, source_identifier, perms))
+
+    if len(perms) > 0:
+        target_permissions_url = permissions_url_template.format(org=target_org,
+                                                                 app=target_app,
+                                                                 collection=target_collection,
+                                                                 uuid=source_identifier,
+                                                                 **config.get('target_endpoint'))
+
+        for permission in perms:
+            data = {'permission': permission}
+
+            logger.info('Posting permission %s to %s' % (json.dumps(data), target_permissions_url))
+
+            r = session_target.post(target_permissions_url, json.dumps(data))
+
+            if r.status_code != 200:
+                logger.error(
+                        'ERROR posting permission %s to URL=[%s]: %s' % (
+                            json.dumps(data), target_permissions_url, r.text))
+
+    return True
+
+
+def migrate_data(app, collection_name, source_entity, attempts=0, force=False):
+    if config.get('skip_data') and not force:
+        return True
+
+    # check the cache to see if this entity has changed
+    if not config.get('skip_cache_read', False) and not force:
+        try:
+            str_modified = cache.get(source_entity.get('uuid'))
+
+            if str_modified not in [None, 'None']:
+
+                modified = long(str_modified)
+
+                logger.debug('FOUND CACHE: %s = %s ' % (source_entity.get('uuid'), modified))
+
+                if modified <= source_entity.get('modified'):
+
+                    modified_date = datetime.datetime.utcfromtimestamp(modified / 1000)
+                    e_uuid = source_entity.get('uuid')
+
+                    uuid_datetime = time_uuid.TimeUUID(e_uuid).get_datetime()
+
+                    logger.debug('Skipping ENTITY: %s / %s / %s / %s (%s) / %s (%s)' % (
+                        config.get('org'), app, collection_name, e_uuid, uuid_datetime, modified, modified_date))
+                    return True
+                else:
+                    logger.debug('DELETING CACHE: %s ' % (source_entity.get('uuid')))
+                    cache.delete(source_entity.get('uuid'))
+        except:
+            logger.error('Error on checking cache for uuid=[%s]' % source_entity.get('uuid'))
+            logger.error(traceback.format_exc())
+
+    if exclude_collection(collection_name):
+        logger.warn('Excluding entity in filtered collection [%s]' % collection_name)
+        return True
+
+    # handle duplicate user case
+    if collection_name in ['users', 'user']:
+        source_entity = confirm_user_entity(app, source_entity)
+
+    source_identifier = get_source_identifier(source_entity)
+
+    logger.info('Visiting ENTITY data [%s / %s (%s) ] at %s' % (
+        collection_name, source_identifier, get_uuid_time(source_entity.get('uuid')), str(datetime.datetime.utcnow())))
+
+    entity_copy = source_entity.copy()
+
+    if 'metadata' in entity_copy:
+        entity_copy.pop('metadata')
+
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    try:
+        target_entity_url_by_name = put_entity_url_template.format(org=target_org,
+                                                                   app=target_app,
+                                                                   collection=target_collection,
+                                                                   uuid=source_identifier,
+                                                                   **config.get('target_endpoint'))
+
+        r = session_target.put(url=target_entity_url_by_name, data=json.dumps(entity_copy))
+
+        if attempts > 1:
+            logger.warn('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
+                attempts, collection_name, source_identifier, target_entity_url_by_name))
+        else:
+            logger.debug('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
+                attempts, collection_name, source_identifier, target_entity_url_by_name))
+
+        if r.status_code == 200:
+            # Worked => WE ARE DONE
+            logger.info(
+                    'migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
+                        True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
+                        source_entity.get('modified'),))
+
+            if not config.get('skip_cache_write', False):
+                logger.debug('SETTING CACHE | uuid=[%s] | modified=[%s]' % (
+                    source_entity.get('uuid'), str(source_entity.get('modified'))))
+
+                cache.set(source_entity.get('uuid'), str(source_entity.get('modified')))
+
+            if collection_name in ['role', 'group', 'roles', 'groups']:
+                migrate_permissions(app, collection_name, source_entity, attempts=0)
+
+            if collection_name in ['users', 'user']:
+                migrate_user_credentials(app, collection_name, source_entity, attempts=0)
+
+            return True
+
+        else:
+            logger.error('Failure [%s] on attempt [%s] to PUT url=[%s], entity=[%s] response=[%s]' % (
+                r.status_code, attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
+
+            if attempts >= 5:
+                logger.critical(
+                        'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
+                            True, attempts, source_entity.get('created'), source_entity.get('modified'), app,
+                            collection_name, source_identifier))
+
+                return False
+
+            if r.status_code == 400:
+
+                if target_collection in ['roles', 'role']:
+                    return repair_user_role(app, collection_name, source_entity)
+
+                elif target_collection in ['users', 'user']:
+                    return handle_user_migration_conflict(app, collection_name, source_entity)
+
+                elif 'duplicate_unique_property_exists' in r.text:
+                    logger.error(
+                            'WILL NOT RETRY (duplicate) [%s] attempts to PUT url=[%s], entity=[%s] response=[%s]' % (
+                                attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
+
+                    return False
+
+            elif r.status_code == 403:
+                logger.critical(
+                        'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
+                            False, attempts, source_entity.get('created'), source_entity.get('modified'), app,
+                            collection_name, source_identifier))
+                return False
+
+    except:
+        logger.error(traceback.format_exc())
+        logger.error('error in migrate_data on entity: %s' % json.dumps(source_entity))
+
+    logger.warn(
+            'UNSUCCESSFUL migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
+                True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
+                source_entity.get('modified'),))
+
+    return migrate_data(app, collection_name, source_entity, attempts=attempts + 1)
+
+
+def handle_user_migration_conflict(app, collection_name, source_entity, attempts=0, depth=0):
+    if collection_name in ['users', 'user']:
+        return False
+
+    username = source_entity.get('username')
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    target_entity_url = get_entity_url_template.format(org=target_org,
+                                                       app=target_app,
+                                                       collection=target_collection,
+                                                       uuid=username,
+                                                       **config.get('target_endpoint'))
+
+    # There is retry build in, here is the short circuit
+    if attempts >= 5:
+        logger.critical(
+                'Aborting after [%s] attempts to audit user [%s] at URL [%s]' % (attempts, username, target_entity_url))
+
+        return False
+
+    r = session_target.get(url=target_entity_url)
+
+    if r.status_code == 200:
+        target_entity = r.json().get('entities')[0]
+
+        if source_entity.get('created') < target_entity.get('created'):
+            return repair_user_role(app, collection_name, source_entity)
+
+    elif r.status_code / 100 == 5:
+        audit_logger.warning(
+                'CONFLICT: handle_user_migration_conflict failed attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % (
+                    attempts, r.status_code, target_entity_url, r.text))
+
+        time.sleep(DEFAULT_RETRY_SLEEP)
+
+        return handle_user_migration_conflict(app, collection_name, source_entity, attempts)
+
+    else:
+        audit_logger.error(
+                'CONFLICT: Failed handle_user_migration_conflict attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % (
+                    attempts, r.status_code, target_entity_url, r.text))
+
+        return False
+
+
+def get_best_source_entity(app, collection_name, source_entity, depth=0):
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    target_pk = 'uuid'
+
+    if target_collection in ['users', 'user']:
+        target_pk = 'username'
+    elif target_collection in ['roles', 'role']:
+        target_pk = 'name'
+
+    target_name = source_entity.get(target_pk)
+
+    # there should be no target entity now, we just need to decide which one from the source to use
+    source_entity_url_by_name = get_entity_url_template.format(org=config.get('org'),
+                                                               app=app,
+                                                               collection=collection_name,
+                                                               uuid=target_name,
+                                                               **config.get('source_endpoint'))
+
+    r_get_source_entity = session_source.get(source_entity_url_by_name)
+
+    # if we are able to get at the source by PK...
+    if r_get_source_entity.status_code == 200:
+
+        # extract the entity from the response
+        entity_from_get = r_get_source_entity.json().get('entities')[0]
+
+        return entity_from_get
+
+    elif r_get_source_entity.status_code / 100 == 4:
+        # wasn't found, get by QL and sort
+        source_entity_query_url = collection_query_url_template.format(org=config.get('org'),
+                                                                       app=app,
+                                                                       collection=collection_name,
+                                                                       ql='select * where %s=\'%s\' order by created asc' % (
+                                                                           target_pk, target_name),
+                                                                       limit=config.get('limit'),
+                                                                       **config.get('source_endpoint'))
+
+        logger.info('Attempting to determine best entity from query on URL %s' % source_entity_query_url)
+
+        q = UsergridQueryIterator(source_entity_query_url, sleep_time=config.get('error_retry_sleep'))
+
+        desired_entity = None
+
+        entity_counter = 0
+
+        for e in q:
+            entity_counter += 1
+
+            if desired_entity is None:
+                desired_entity = e
+
+            elif e.get('created') < desired_entity.get('created'):
+                desired_entity = e
+
+        if desired_entity is None:
+            logger.warn('Unable to determine best of [%s] entities from query on URL %s' % (
+                entity_counter, source_entity_query_url))
+
+            return source_entity
+
+        else:
+            return desired_entity
+
+    else:
+        return source_entity
+
+
+def repair_user_role(app, collection_name, source_entity, attempts=0, depth=0):
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    # For the users collection, there seemed to be cases where a USERNAME was created/existing with the a
+    # different UUID which caused a 'collision' - so the point is to delete the entity with the differing
+    # UUID by UUID and then do a recursive call to migrate the data - now that the collision has been cleared
+
+    target_pk = 'uuid'
+
+    if target_collection in ['users', 'user']:
+        target_pk = 'username'
+    elif target_collection in ['roles', 'role']:
+        target_pk = 'name'
+
+    target_name = source_entity.get(target_pk)
+
+    target_entity_url_by_name = get_entity_url_template.format(org=target_org,
+                                                               app=target_app,
+                                                               collection=target_collection,
+                                                               uuid=target_name,
+                                                               **config.get('target_endpoint'))
+
+    logger.warning('Repairing: Deleting name=[%s] entity at URL=[%s]' % (target_name, target_entity_url_by_name))
+
+    r = session_target.delete(target_entity_url_by_name)
+
+    if r.status_code == 200 or (r.status_code in [404, 401] and 'service_resource_not_found' in r.text):
+        logger.info('Deletion of entity at URL=[%s] was [%s]' % (target_entity_url_by_name, r.status_code))
+
+        best_source_entity = get_best_source_entity(app, collection_name, source_entity)
+
+        target_entity_url_by_uuid = get_entity_url_template.format(org=target_org,
+                                                                   app=target_app,
+                                                                   collection=target_collection,
+                                                                   uuid=best_source_entity.get('uuid'),
+                                                                   **config.get('target_endpoint'))
+
+        r = session_target.put(target_entity_url_by_uuid, data=json.dumps(best_source_entity))
+
+        if r.status_code == 200:
+            logger.info('Successfully repaired user at URL=[%s]' % target_entity_url_by_uuid)
+            return True
+
+        else:
+            logger.critical('Failed to PUT [%s] the desired entity  at URL=[%s]: %s' % (
+                r.status_code, target_entity_url_by_name, r.text))
+            return False
+
+    else:
+        # log an error and keep going if we cannot delete the entity at the specified URL.  Unlikely, but if so
+        # then this entity is borked
+        logger.critical(
+                'Deletion of entity at URL=[%s] FAILED [%s]: %s' % (target_entity_url_by_name, r.status_code, r.text))
+        return False
+
+
+def get_target_mapping(app, collection_name):
+    target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
+    target_app = config.get('app_mapping', {}).get(app, app)
+    target_collection = config.get('collection_mapping', {}).get(collection_name, collection_name)
+    return target_app, target_collection, target_org
+
+
+def parse_args():
+    parser = argparse.ArgumentParser(description='Usergrid Org/App Migrator')
+
+    parser.add_argument('--log_dir',
+                        help='path to the place where logs will be written',
+                        default='./',
+                        type=str,
+                        required=False)
+
+    parser.add_argument('--log_level',
+                        help='log level - DEBUG, INFO, WARN, ERROR, CRITICAL',
+                        default='INFO',
+                        type=str,
+                        required=False)
+
+    parser.add_argument('-o', '--org',
+                        help='Name of the org to migrate',
+                        type=str,
+                        required=True)
+
+    parser.add_argument('-a', '--app',
+                        help='Name of one or more apps to include, specify none to include all apps',
+                        required=False,
+                        action='append')
+
+    parser.add_argument('-e', '--include_edge',
+                        help='Name of one or more edges/connection types to INCLUDE, specify none to include all edges',
+                        required=False,
+                        action='append')
+
+    parser.add_argument('--exclude_edge',
+                        help='Name of one or more edges/connection types to EXCLUDE, specify none to include all edges',
+                        required=False,
+                        action='append')
+
+    parser.add_argument('--exclude_collection',
+                        help='Name of one or more collections to EXCLUDE, specify none to include all collections',
+                        required=False,
+                        action='append')
+
+    parser.add_argument('-c', '--collection',
+                        help='Name of one or more collections to include, specify none to include all collections',
+                        default=[],
+                        action='append')
+
+    parser.add_argument('--force_app',
+                        help='Necessary for using 2.0 as a source at times due to API issues.  Forces the specified app(s) to be processed, even if they are not returned from the list of apps in the API call',
+                        default=[],
+                        action='append')
+
+    parser.add_argument('--use_name_for_collection',
+                        help='Name of one or more collections to use [name] instead of [uuid] for creating entities and edges',
+                        default=[],
+                        action='append')
+
+    parser.add_argument('-m', '--migrate',
+                        help='Specifies what to migrate: data, connections, credentials, audit or none (just iterate '
+                             'the apps/collections)',
+                        type=str,
+                        choices=[
+                            'data',
+                            'prune',
+                            'none',
+                            'reput',
+                            'credentials',
+                            'graph',
+                            'permissions'
+                        ],
+                        default='data')
+
+    parser.add_argument('-s', '--source_config',
+                        help='The path to the source endpoint/org configuration file',
+                        type=str,
+                        default='source.json')
+
+    parser.add_argument('-d', '--target_config',
+                        help='The path to the target endpoint/org configuration file',
+                        type=str,
+                        default='destination.json')
+
+    parser.add_argument('--redis_socket',
+                        help='The path to the socket for redis to use',
+                        type=str)
+
+    parser.add_argument('--limit',
+                        help='The number of entities to return per query request',
+                        type=int,
+                        default=100)
+
+    parser.add_argument('-w', '--entity_workers',
+                        help='The number of worker processes to do the migration',
+                        type=int,
+                        default=16)
+
+    parser.add_argument('--visit_cache_ttl',
+                        help='The TTL of the cache of visiting nodes in the graph for connections',
+                        type=int,
+                        default=3600 * 2)
+
+    parser.add_argument('--error_retry_sleep',
+                        help='The number of seconds to wait between retrieving after an error',
+                        type=float,
+                        default=30)
+
+    parser.add_argument('--page_sleep_time',
+                        help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator',
+                        type=float,
+                        default=0)
+
+    parser.add_argument('--entity_sleep_time',
+                        help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator',
+                        type=float,
+                        default=0)
+
+    parser.add_argument('--collection_workers',
+                        help='The number of worker processes to do the migration',
+                        type=int,
+                        default=2)
+
+    parser.add_argument('--queue_size_max',
+                        help='The max size of entities to allow in the queue',
+                        type=int,
+                        default=100000)
+
+    parser.add_argument('--graph_depth',
+                        help='The graph depth to traverse to copy',
+                        type=int,
+                        default=3)
+
+    parser.add_argument('--queue_watermark_high',
+                        help='The point at which publishing to the queue will PAUSE until it is at or below low watermark',
+                        type=int,
+                        default=25000)
+
+    parser.add_argument('--min_modified',
+                        help='Break when encountering a modified date before this, per collection',
+                        type=int,
+                        default=0)
+
+    parser.add_argument('--max_modified',
+                        help='Break when encountering a modified date after this, per collection',
+                        type=long,
+                        default=3793805526000)
+
+    parser.add_argument('--queue_watermark_low',
+                        help='The point at which publishing to the queue will RESUME after it has reached the high watermark',
+                        type=int,
+                        default=5000)
+
+    parser.add_argument('--ql',
+                        help='The QL to use in the filter for reading data from collections',
+                        type=str,
+                        default='select * order by created asc')
+    # default='select * order by created asc')
+
+    parser.add_argument('--repair_data',
+                        help='Repair data when iterating/migrating graph but skipping data',
+                        action='store_true')
+
+    parser.add_argument('--prune',
+                        help='Prune the graph while processing (instead of the prune operation)',
+                        action='store_true')
+
+    parser.add_argument('--skip_data',
+                        help='Skip migrating data (useful for connections only)',
+                        action='store_true')
+
+    parser.add_argument('--skip_credentials',
+                        help='Skip migrating credentials',
+                        action='store_true')
+
+    parser.add_argument('--skip_cache_read',
+                        help='Skip reading the cache (modified timestamps and graph edges)',
+                        dest='skip_cache_read',
+                        action='store_true')
+
+    parser.add_argument('--skip_cache_write',
+                        help='Skip updating the cache with modified timestamps of entities and graph edges',
+                        dest='skip_cache_write',
+                        action='store_true')
+
+    parser.add_argument('--create_apps',
+                        help='Create apps at the target if they do not exist',
+                        dest='create_apps',
+                        action='store_true')
+
+    parser.add_argument('--nohup',
+                        help='specifies not to use stdout for logging',
+                        action='store_true')
+
+    parser.add_argument('--graph',
+                        help='Use GRAPH instead of Query',
+                        dest='graph',
+                        action='store_true')
+
+    parser.add_argument('--su_username',
+                        help='Superuser username',
+                        required=False,
+                        type=str)
+
+    parser.add_argument('--su_password',
+                        help='Superuser Password',
+                        required=False,
+                        type=str)
+
+    parser.add_argument('--inbound_connections',
+                        help='Name of the org to migrate',
+                        action='store_true')
+
+    parser.add_argument('--map_app',
+                        help="Multiple allowed: A colon-separated string such as 'apples:oranges' which indicates to"
+                             " put data from the app named 'apples' from the source endpoint into app named 'oranges' "
+                             "in the target endpoint",
+                        default=[],
+                        action='append')
+
+    parser.add_argument('--map_collection',
+                        help="One or more colon-separated string such as 'cats:dogs' which indicates to put data from "
+                             "collections named 'cats' from the source endpoint into a collection named 'dogs' in the "
+                             "target endpoint, applicable globally to all apps",
+                        default=[],
+                        action='append')
+
+    parser.add_argument('--map_org',
+                        help="One or more colon-separated strings such as 'red:blue' which indicates to put data from "
+                             "org named 'red' from the source endpoint into a collection named 'blue' in the target "
+                             "endpoint",
+                        default=[],
+                        action='append')
+
+    my_args = parser.parse_args(sys.argv[1:])
+
+    return vars(my_args)
+
+
+def init():
+    global config
+
+    if config.get('migrate') == 'credentials':
+
+        if config.get('su_password') is None or config.get('su_username') is None:
+            message = 'ABORT: In order to migrate credentials, Superuser parameters (su_password, su_username) are required'
+            print message
+            logger.critical(message)
+            exit()
+
+    config['collection_mapping'] = {}
+    config['app_mapping'] = {}
+    config['org_mapping'] = {}
+
+    for mapping in config.get('map_collection', []):
+        parts = mapping.split(':')
+
+        if len(parts) == 2:
+            config['collection_mapping'][parts[0]] = parts[1]
+        else:
+            logger.warning('Skipping Collection mapping: [%s]' % mapping)
+
+    for mapping in config.get('map_app', []):
+        parts = mapping.split(':')
+
+        if len(parts) == 2:
+            config['app_mapping'][parts[0]] = parts[1]
+        else:
+            logger.warning('Skipping App mapping: [%s]' % mapping)
+
+    for mapping in config.get('map_org', []):
+        parts = mapping.split(':')
+
+        if len(parts) == 2:
+            config['org_mapping'][parts[0]] = parts[1]
+            logger.info('Mapping Org [%s] to [%s] from mapping [%s]' % (parts[0], parts[1], mapping))
+        else:
+            logger.warning('Skipping Org mapping: [%s]' % mapping)
+
+    with open(config.get('source_config'), 'r') as f:
+        config['source_config'] = json.load(f)
+
+    with open(config.get('target_config'), 'r') as f:
+        config['target_config'] = json.load(f)
+
+    if config['exclude_collection'] is None:
+        config['exclude_collection'] = []
+
+    config['source_endpoint'] = config['source_config'].get('endpoint').copy()
+    config['source_endpoint'].update(config['source_config']['credentials'][config['org']])
+
+    target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
+
+    config['target_endpoint'] = config['target_config'].get('endpoint').copy()
+    config['target_endpoint'].update(config['target_config']['credentials'][target_org])
+
+
+def wait_for(threads, label, sleep_time=60):
+    wait = True
+
+    logger.info('Starting to wait for [%s] threads with sleep time=[%s]' % (len(threads), sleep_time))
+
+    while wait:
+        wait = False
+        alive_count = 0
+
+        for t in threads:
+
+            if t.is_alive():
+                alive_count += 1
+                logger.info('Thread [%s] is still alive' % t.name)
+
+        if alive_count > 0:
+            wait = True
+            logger.info('Continuing to wait for [%s] threads with sleep time=[%s]' % (alive_count, sleep_time))
+            time.sleep(sleep_time)
+
+    logger.warn('All workers [%s] done!' % label)
+
+
+def count_bytes(entity):
+    entity_copy = entity.copy()
+
+    if 'metadata' in entity_copy:
+        del entity_copy['metadata']
+
+    entity_str = json.dumps(entity_copy)
+
+    return len(entity_str)
+
+
+def migrate_user_credentials(app, collection_name, source_entity, attempts=0):
+    # this only applies to users
+    if collection_name not in ['users', 'user'] \
+            or config.get('skip_credentials', False):
+        return False
+
+    source_identifier = get_source_identifier(source_entity)
+
+    target_app, target_collection, target_org = get_target_mapping(app, collection_name)
+
+    # get the URLs for the source and target users
+
+    source_url = user_credentials_url_template.format(org=config.get('org'),
+                                                      app=app,
+                                                      uuid=source_identifier,
+                                                      **config.get('source_endpoint'))
+
+    target_url = user_credentials_url_template.format(org=target_org,
+                                                      app=target_app,
+                                                      uuid=source_identifier,
+                                                      **config.get('target_endpoint'))
+
+    # this endpoint for some reason uses basic auth...
+    r = requests.get(source_url, auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password')))
+
+    if r.status_code != 200:
+        logger.error('Unable to migrate credentials due to HTTP [%s] on GET URL [%s]: %s' % (
+            r.status_code, source_url, r.text))
+
+        return False
+
+    source_credentials = r.json()
+
+    logger.info('Putting credentials to [%s]...' % target_url)
+
+    r = requests.put(target_url,
+                     data=json.dumps(source_credentials),
+                     auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password')))
+
+    if r.status_code != 200:
+        logger.error(
+                'Unable to migrate credentials due to HTTP [%s] on PUT URL [%s]: %s' % (
+                    r.status_code, target_url, r.text))
+        return False
+
+    logger.info('migrate_user_credentials | success=[%s] | app/collection/name = %s/%s/%s' % (
+        True, app, collection_name, source_entity.get('uuid')))
+
+    return True
+
+
+def check_response_status(r, url, exit_on_error=True):
+    if r.status_code != 200:
+        logger.critical('HTTP [%s] on URL=[%s]' % (r.status_code, url))
+        logger.critical('Response: %s' % r.text)
+
+        if exit_on_error:
+            exit()
+
+
+def do_operation(apps_and_collections, operation):
+    status_map = {}
+
+    logger.info('Creating queues...')
+
+    # Mac, for example, does not support the max_size for a queue in Python
+    if _platform == "linux" or _platform == "linux2":
+        entity_queue = Queue(maxsize=config.get('queue_size_max'))
+        collection_queue = Queue(maxsize=config.get('queue_size_max'))
+        collection_response_queue = Queue(maxsize=config.get('queue_size_max'))
+    else:
+        entity_queue = Queue()
+        collection_queue = Queue()
+        collection_response_queue = Queue()
+
+    logger.info('Starting entity_workers...')
+
+    collection_count = 0
+    # create the entity workers, but only start them (later) if there is work to do
+    entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))]
+
+    # create the collection workers, but only start them (later) if there is work to do
+    collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in
+                          xrange(config.get('collection_workers'))]
+
+    status_listener = StatusListener(collection_response_queue, entity_queue)
+
+    try:
+        # for each app, publish the (app_name, collection_name) to the queue.
+        # this is received by a collection worker who iterates the collection and publishes
+        # entities into a queue.  These are received by an individual entity worker which
+        # executes the specified operation on the entity
+
+        for app, app_data in apps_and_collections.get('apps', {}).iteritems():
+            logger.info('Processing app=[%s]' % app)
+
+            status_map[app] = {
+                'iteration_started': str(datetime.datetime.now()),
+                'max_created': -1,
+                'max_modified': -1,
+                'min_created': 1584946416000,
+                'min_modified': 1584946416000,
+                'count': 0,
+                'bytes': 0,
+                'collections': {}
+            }
+
+            # iterate the collections which are returned.
+            for collection_name in app_data.get('collections'):
+                logger.info('Publishing app / collection: %s / %s' % (app, collection_name))
+
+                collection_count += 1
+                collection_queue.put((app, collection_name))
+
+            logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app))
+
+        # only start the threads if there is work to do
+        if collection_count > 0:
+            status_listener.start()
+
+            # start the worker processes which will iterate the collections
+            [w.start() for w in collection_workers]
+
+            # start the worker processes which will do the work of migrating
+            [w.start() for w in entity_workers]
+
+            # allow collection workers to finish
+            wait_for(collection_workers, label='collection_workers', sleep_time=60)
+
+            # allow entity workers to finish
+            wait_for(entity_workers, label='entity_workers', sleep_time=60)
+
+            status_listener.terminate()
+
+    except KeyboardInterrupt:
+        logger.warning('Keyboard Interrupt, aborting...')
+        entity_queue.close()
+        collection_queue.close()
+        collection_response_queue.close()
+
+        [os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers]
+        [os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers]
+        os.kill(super(StatusListener, status_listener).pid, signal.SIGINT)
+
+        [w.terminate() for w in entity_workers]
+        [w.terminate() for w in collection_workers]
+        status_listener.terminate()
+
+    logger.info('entity_workers DONE!')
+
+
+def filter_apps_and_collections(org_apps):
+    app_collecitons = {
+        'apps': {
+
+        }
+    }
+
+    try:
+        selected_apps = config.get('app')
+
+        # iterate the apps retrieved from the org
+        for org_app in sorted(org_apps.keys()):
+            logger.info('Found SOURCE App: %s' % org_app)
+
+        time.sleep(3)
+
+        for org_app in sorted(org_apps.keys()):
+            parts = org_app.split('/')
+            app = parts[1]
+
+            # if apps are specified and the current app is not in the list, skip it
+            if selected_apps and len(selected_apps) > 0 and app not in selected_apps:
+                logger.warning('Skipping app [%s] not included in process list [%s]' % (app, selected_apps))
+                continue
+
+            app_collecitons['apps'][app] = {
+                'collections': []
+            }
+
+            # get the list of collections from the source org/app
+            source_app_url = app_url_template.format(org=config.get('org'),
+                                                     app=app,
+                                                     **config.get('source_endpoint'))
+            logger.info('GET %s' % source_app_url)
+
+            r_collections = session_source.get(source_app_url)
+
+            collection_attempts = 0
+
+            # sometimes this call was not working so I put it in a loop to force it...
+            while r_collections.status_code != 200 and collection_attempts < 5:
+                collection_attempts += 1
+                logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code,
+                                                                  source_app_url))
+                time.sleep(DEFAULT_RETRY_SLEEP)
+                r_collections = session_source.get(source_app_url)
+
+            if collection_attempts >= 5:
+                logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url)
+                continue
+
+            app_response = r_collections.json()
+
+            logger.info('App Response: ' + json.dumps(app_response))
+
+            app_entities = app_response.get('entities', [])
+
+            if len(app_entities) > 0:
+                app_entity = app_entities[0]
+                collections = app_entity.get('metadata', {}).get('collections', {})
+                logger.info('App=[%s] starting Collections=[%s]' % (app, collections))
+
+                app_collecitons['apps'][app]['collections'] = [c for c in collections if include_collection(c)]
+                logger.info('App=[%s] filtered Collections=[%s]' % (app, collections))
+
+    except:
+        print traceback.format_exc()
+
+    return app_collecitons
+
+
+def confirm_target_org_apps(apps_and_collections):
+    for app in apps_and_collections.get('apps'):
+
+        # it is possible to map source orgs and apps to differently named targets.  This gets the
+        # target names for each
+        target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
+        target_app = config.get('app_mapping', {}).get(app, app)
+
+        # Check that the target Org/App exists.  If not, move on to the next
+        target_app_url = app_url_template.format(org=target_org,
+                                                 app=target_app,
+                                                 **config.get('target_endpoint'))
+        logger.info('GET %s' % target_app_url)
+        r_target_apps = session_target.get(target_app_url)
+
+        if r_target_apps.status_code != 200:
+
+            if config.get('create_apps', DEFAULT_CREATE_APPS):
+                create_app_url = org_management_app_url_template.format(org=target_org,
+                                                                        app=target_app,
+                                                                        **config.get('target_endpoint'))
+                app_request = {'name': target_app}
+                r = session_target.post(create_app_url, data=json.dumps(app_request))
+
+                if r.status_code != 200:
+                    logger.critical('--create_apps specified and unable to create app [%s] at URL=[%s]: %s' % (
+                        target_app, create_app_url, r.text))
+                    logger.critical('Process will now exit')
+                    exit()
+                else:
+                    logger.warning('Created app=[%s] at URL=[%s]: %s' % (target_app, create_app_url, r.text))
+            else:
+                logger.critical('Target application DOES NOT EXIST at [%s] URL=%s' % (
+                    r_target_apps.status_code, target_app_url))
+                continue
+
+
+def main():
+    global config, cache
+
+    config = parse_args()
+    init()
+    init_logging()
+
+    logger.warn('Script starting')
+
+    try:
+        if config.get('redis_socket') is not None:
+            cache = redis.Redis(unix_socket_path=config.get('redis_socket'))
+
+        else:
+            # this does not try to connect to redis
+            cache = redis.StrictRedis(host='localhost', port=6379, db=0)
+
+        # this is necessary to test the connection to redis
+        cache.get('usergrid')
+
+    except:
+        logger.error(
+                'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...')
+        logger.error(
+                'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...')
+
+        time.sleep(3)
+
+        config['use_cache'] = False
+        config['skip_cache_read'] = True
+        config['skip_cache_write'] = True
+
+    org_apps = {
+    }
+
+    force_apps = config.get('force_app', [])
+
+    if force_apps is not None and len(force_apps) > 0:
+        logger.warn('Forcing only the following apps to be processed: %s' % force_apps)
+
+        for app in force_apps:
+            ke

<TRUNCATED>

Mime
View raw message