superset-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maximebeauche...@apache.org
Subject [incubator-superset] branch master updated: [bugfix] fix merge conflict that broke Hive support (#3196)
Date Fri, 28 Jul 2017 04:34:18 GMT
This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new e4fba0f  [bugfix] fix merge conflict that broke Hive support (#3196)
e4fba0f is described below

commit e4fba0ffb75f8210fc0a894d355851bcbfb0d285
Author: Maxime Beauchemin <maximebeauchemin@gmail.com>
AuthorDate: Thu Jul 27 21:34:15 2017 -0700

    [bugfix] fix merge conflict that broke Hive support (#3196)
---
 superset/config.py          |  6 +++++-
 superset/db_engine_specs.py | 35 +++++++++++++++++++++++------------
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/superset/config.py b/superset/config.py
index 2c27415..47126f8 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -308,8 +308,12 @@ SILENCE_FAB = True
 # configuration. These blueprints will get integrated in the app
 BLUEPRINTS = []
 
-try:
+# Provide a callable that receives a tracking_url and returns another
+# URL. This is used to translate internal Hadoop job tracker URL
+# into a proxied one
+TRACKING_URL_TRANSFORMER = lambda x: x
 
+try:
     if CONFIG_PATH_ENV_VAR in os.environ:
         # Explicitly import config module that is not in pythonpath; useful
         # for case where app is being executed via pex.
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index 8481458..b43e94e 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -31,8 +31,9 @@ from flask_babel import lazy_gettext as _
 
 from superset.utils import SupersetTemplateException
 from superset.utils import QueryStatus
-from superset import utils
-from superset import cache_util
+from superset import conf, cache_util, utils
+
+tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')
 
 Grain = namedtuple('Grain', 'name label function')
 
@@ -683,7 +684,7 @@ class HiveEngineSpec(PrestoEngineSpec):
     @classmethod
     def progress(cls, log_lines):
         total_jobs = 1  # assuming there's at least 1 job
-        current_job = None
+        current_job = 1
         stages = {}
         for line in log_lines:
             match = cls.jobs_stats_r.match(line)
@@ -692,6 +693,7 @@ class HiveEngineSpec(PrestoEngineSpec):
             match = cls.launching_job_r.match(line)
             if match:
                 current_job = int(match.groupdict()['job_number'])
+                total_jobs = int(match.groupdict()['max_jobs']) or 1
                 stages = {}
             match = cls.stage_progress_r.match(line)
             if match:
@@ -701,10 +703,9 @@ class HiveEngineSpec(PrestoEngineSpec):
                 stages[stage_number] = (map_progress + reduce_progress) / 2
         logging.info(
             "Progress detail: {}, "
-            "total jobs: {}".format(stages, total_jobs))
+            "current job {}, "
+            "total jobs: {}".format(stages, current_job, total_jobs))
 
-        if not total_jobs or not current_job:
-            return 0
         stage_progress = sum(
             stages.values()) / len(stages.values()) if stages else 0
 
@@ -731,18 +732,16 @@ class HiveEngineSpec(PrestoEngineSpec):
         polled = cursor.poll()
         last_log_line = 0
         tracking_url = None
+        job_id = None
         while polled.operationState in unfinished_states:
             query = session.query(type(query)).filter_by(id=query.id).one()
             if query.status == QueryStatus.STOPPED:
                 cursor.cancel()
                 break
 
-            resp = cursor.fetch_logs()
-            if resp and resp.log:
-                log = resp.log or ''
-                log_lines = resp.log.splitlines()
-                logging.info("\n".join(log_lines[last_log_line:]))
-                last_log_line = len(log_lines) - 1
+            log = cursor.fetch_logs() or ''
+            if log:
+                log_lines = log.splitlines()
                 progress = cls.progress(log_lines)
                 logging.info("Progress total: {}".format(progress))
                 needs_commit = False
@@ -754,8 +753,20 @@ class HiveEngineSpec(PrestoEngineSpec):
                     if tracking_url:
                         logging.info(
                             "Found the tracking url: {}".format(tracking_url))
+                        tracking_url = tracking_url_trans(tracking_url)
+                        logging.info(
+                            "Transformation applied: {}".format(tracking_url))
                         query.tracking_url = tracking_url
+                        job_id = tracking_url.split('/')[-2]
+                        logging.info("Job id: {}".format(job_id))
                         needs_commit = True
+                if job_id and len(log_lines) > last_log_line:
+                    # Wait for job id before logging things out
+                    # this allows for prefixing all log lines and becoming
+                    # searchable in something like Kibana
+                    for l in log_lines[last_log_line:]:
+                        logging.info("[{}] {}".format(job_id, l))
+                    last_log_line = len(log_lines)
                 if needs_commit:
                     session.commit()
             time.sleep(5)

-- 
To stop receiving notification emails like this one, please contact
['"commits@superset.apache.org" <commits@superset.apache.org>'].

Mime
View raw message