airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] [airflow] kaxil commented on a change in pull request #5743: [AIRFLOW-5088][AIP-24] Persisting serialized DAG in DB for webserver scalability
Date Wed, 04 Sep 2019 13:26:38 GMT
kaxil commented on a change in pull request #5743: [AIRFLOW-5088][AIP-24] Persisting serialized
DAG in DB for webserver scalability

 File path: airflow/dag/serialization/
 @@ -176,43 +168,35 @@ def _serialize(cls, var, visited_dags):  # pylint: disable=too-many-return-state
             return FAILED
-    def _deserialize(cls, encoded_var, visited_dags):  # pylint: disable=too-many-return-statements
+    def _deserialize(cls, encoded_var):  # pylint: disable=too-many-return-statements
         """Helper function of depth first search for deserialization."""
             # JSON primitives (except for dict) are not encoded.
             if cls._is_primitive(encoded_var):
                 return encoded_var
             elif isinstance(encoded_var, list):
-                return [cls._deserialize(v, visited_dags) for v in encoded_var]
+                return [cls._deserialize(v) for v in encoded_var]
             assert isinstance(encoded_var, dict)
             var = encoded_var[Encoding.VAR]
             type_ = encoded_var[Encoding.TYPE]
             if type_ == DAT.DICT:
-                return {k: cls._deserialize(v, visited_dags) for k, v in var.items()}
+                return {k: cls._deserialize(v) for k, v in var.items()}
             elif type_ == DAT.DAG:
-                if isinstance(var, dict):
-                    return airflow.dag.serialization.SerializedDAG.deserialize_dag(
-                        var, visited_dags)
-                elif isinstance(var, str) and var in visited_dags:
-                    # dag_id is stored in the serailized form for a visited DAGs.
-                    return visited_dags[var]
-                LOG.warning('Invalid DAG %s in deserialization.', var)
-                return None
+                return airflow.dag.serialization.SerializedDAG.deserialize_dag(var)
             elif type_ == DAT.OP:
-                return airflow.dag.serialization.SerializedBaseOperator.deserialize_operator(
-                    var, visited_dags)
+                return airflow.dag.serialization.SerializedBaseOperator.deserialize_operator(var)
             elif type_ == DAT.DATETIME:
                 return dateutil.parser.parse(var)
             elif type_ == DAT.TIMEDELTA:
                 return datetime.timedelta(seconds=var)
             elif type_ == DAT.TIMEZONE:
 Review comment:
   agree, will update

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message