airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-373] Enhance CLI variables functionality
Date Fri, 29 Jul 2016 00:07:52 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b4aa64731 -> 384589890


[AIRFLOW-373] Enhance CLI variables functionality

Add export/import to/from json file option for CLI variable command.
Add delete variable option for CLI variable command.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/38458989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/38458989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/38458989

Branch: refs/heads/master
Commit: 38458989009a9708b20f8e77f8e1244a3261cddb
Parents: b4aa647
Author: Norman Mu <norman@agari.com>
Authored: Wed Jul 27 15:44:38 2016 -0700
Committer: Sid Anand <siddharthanand@yahoo.com>
Committed: Thu Jul 28 16:50:50 2016 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++--
 tests/core.py      | 48 +++++++++++++++++++++++++++++++++
 2 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38458989/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index cf05362..2d02f1e 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -161,6 +161,7 @@ def trigger_dag(args):
 
 
 def variables(args):
+
     if args.get:
         try:
             var = Variable.get(args.get,
@@ -169,15 +170,69 @@ def variables(args):
             print(var)
         except ValueError as e:
             print(e)
+    if args.delete:
+        session = settings.Session()
+        session.query(Variable).filter_by(key=args.delete).delete()
+        session.commit()
+        session.close()
     if args.set:
         Variable.set(args.set[0], args.set[1])
-    if not args.set and not args.get:
+    # Work around 'import' as a reserved keyword
+    imp = getattr(args, 'import')
+    if imp:
+        if os.path.exists(imp):
+            import_helper(imp)
+        else:
+            print("Missing variables file.")
+    if args.export:
+        export_helper(args.export)
+    if not (args.set or args.get or imp or args.export or args.delete):
         # list all variables
         session = settings.Session()
         vars = session.query(Variable)
         msg = "\n".join(var.key for var in vars)
         print(msg)
 
+def import_helper(filepath):
+    with open(filepath, 'r') as varfile:
+        var = varfile.read()
+
+    try:
+        d = json.loads(var)
+    except Exception:
+        print("Invalid variables file.")
+    else:
+        try:
+            n = 0
+            for k, v in d.items():
+                if isinstance(v, dict):
+                    Variable.set(k, v, serialize_json=True)
+                else:
+                    Variable.set(k, v)
+                n += 1
+        except Exception:
+            pass
+        finally:
+            print("{} of {} variables successfully updated.".format(n, len(d)))
+
+def export_helper(filepath):
+    session = settings.Session()
+    qry = session.query(Variable).all()
+    session.close()
+
+    var_dict = {}
+    d = json.JSONDecoder()
+    for var in qry:
+        val = None
+        try:
+            val = d.decode(var.val)
+        except Exception:
+            val = var.val
+        var_dict[var.key] = val
+
+    with open(filepath, 'w') as varfile:
+        varfile.write(json.dumps(var_dict, sort_keys=True, indent=4))
+    print("{} variables successfully exported to {}".format(len(var_dict), filepath))
 
 def pause(args, dag=None):
     set_is_paused(True, args, dag)
@@ -776,6 +831,18 @@ class CLIFactory(object):
             ("-j", "--json"),
             help="Deserialize JSON variable",
             action="store_true"),
+        'var_import': Arg(
+            ("-i", "--import"),
+            metavar="FILEPATH",
+            help="Import variables from JSON file"),
+        'var_export': Arg(
+            ("-e", "--export"),
+            metavar="FILEPATH",
+            help="Export variables to JSON file"),
+        'var_delete': Arg(
+            ("-x", "--delete"),
+            metavar="KEY",
+            help="Delete a variable"),
         # kerberos
         'principal': Arg(
             ("principal",), "kerberos principal",
@@ -926,7 +993,7 @@ class CLIFactory(object):
         }, {
             'func': variables,
             'help': "List all variables",
-            "args": ('set', 'get', 'json', 'default'),
+            "args": ('set', 'get', 'json', 'default', 'var_import', 'var_export', 'var_delete'),
         }, {
             'func': kerberos,
             'help': "Start a kerberos ticket renewer",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38458989/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 36b484b..4b1926c 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -898,6 +898,7 @@ class CliTests(unittest.TestCase):
         )
 
     def test_variables(self):
+        # Checks if all subcommands are properly received
         cli.variables(self.parser.parse_args([
             'variables', '-s', 'foo', '{"foo":"bar"}']))
         cli.variables(self.parser.parse_args([
@@ -906,7 +907,54 @@ class CliTests(unittest.TestCase):
             'variables', '-g', 'baz', '-d', 'bar']))
         cli.variables(self.parser.parse_args([
             'variables']))
+        cli.variables(self.parser.parse_args([
+            'variables', '-x', 'bar']))
+        cli.variables(self.parser.parse_args([
+            'variables', '-i', DEV_NULL]))
+        cli.variables(self.parser.parse_args([
+            'variables', '-e', DEV_NULL]))
+
+        cli.variables(self.parser.parse_args([
+            'variables', '-s', 'bar', 'original']))
+        # First export
+        cli.variables(self.parser.parse_args([
+            'variables', '-e', 'variables1.json']))
+
+        first_exp = open('variables1.json', 'r')
+
+        cli.variables(self.parser.parse_args([
+            'variables', '-s', 'bar', 'updated']))
+        cli.variables(self.parser.parse_args([
+            'variables', '-s', 'foo', '{"foo":"oops"}']))
+        cli.variables(self.parser.parse_args([
+            'variables', '-x', 'foo']))
+        # First import
+        cli.variables(self.parser.parse_args([
+            'variables', '-i', 'variables1.json']))
+
+        assert models.Variable.get('bar') == 'original'
+        assert models.Variable.get('foo') == '{"foo": "bar"}'
+        # Second export
+        cli.variables(self.parser.parse_args([
+            'variables', '-e', 'variables2.json']))
+
+        second_exp = open('variables2.json', 'r')
+        assert second_exp.read() == first_exp.read()
+        second_exp.close()
+        first_exp.close()
+        # Second import
+        cli.variables(self.parser.parse_args([
+            'variables', '-i', 'variables2.json']))
+
+        assert models.Variable.get('bar') == 'original'
+        assert models.Variable.get('foo') == '{"foo": "bar"}'
 
+        session = settings.Session()
+        session.query(Variable).delete()
+        session.commit()
+        session.close()
+        os.remove('variables1.json')
+        os.remove('variables2.json')
 
 class WebUiTests(unittest.TestCase):
     def setUp(self):


Mime
View raw message