zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b..@apache.org
Subject [1/3] zeppelin git commit: [ZEPPELIN-116] Add Apache Mahout Interpreter
Date Wed, 09 Nov 2016 04:40:46 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 0758b7078 -> e25266706


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e2526670/scripts/mahout/add_mahout_interpreters.py
----------------------------------------------------------------------
diff --git a/scripts/mahout/add_mahout_interpreters.py b/scripts/mahout/add_mahout_interpreters.py
new file mode 100644
index 0000000..307364d
--- /dev/null
+++ b/scripts/mahout/add_mahout_interpreters.py
@@ -0,0 +1,290 @@
+# /**
+#  * Licensed to the Apache Software Foundation (ASF) under one
+#  * or more contributor license agreements.  See the NOTICE file
+#  * distributed with this work for additional information
+#  * regarding copyright ownership.  The ASF licenses this file
+#  * to you under the Apache License, Version 2.0 (the
+#  * "License"); you may not use this file except in compliance
+#  * with the License.  You may obtain a copy of the License at
+#  *
+#  *     http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+
+import argparse
+import json
+
+from os.path import isfile
+from os import getcwd
+
+from subprocess import call, check_call
+
+
+#######################################################################################################################
+# I put these here so it will (hopeully) be easy(er) to bump versions / maintain
+# If there is demand, we could easily make parts or all comand line arguments as well
+#######################################################################################################################
+tar_name = "apache-mahout-distribution-0.12.2.tar.gz"
+mahout_bin_url =  "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name
+mahout_version = "0.12.2"
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true")
+parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true")
+parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME")
+parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already
installed Apache Mahout")
+parser.add_argument("--overwrite_existing", help="if %sparkMahout or %flinkMahout exist,
delete them and create new ones. Otherwise Fail.", action="store_true")
+
+args = parser.parse_args()
+
+
+
+class ZeppelinTerpWrangler:
+    def __init__(self, interpreter_json_path):
+        self.interpreter_json_path = interpreter_json_path
+
+    def _getTerpID(self, terpName):
+        terp_id = None
+        for k, v in self.interpreter_json['interpreterSettings'].iteritems():
+            if v['name'] == terpName:
+                terp_id = k
+                break
+
+        return terp_id
+
+    def _terpExists(self, terpName):
+        terp_id = self._getTerpID(terpName)
+        if terp_id == None:
+            return False
+        return True
+
+    def createTerp(self, original_terp_name, new_terp_name, overwrite_existing=True ):
+
+        new_terp_id = new_terp_name
+        if self._terpExists(new_terp_name):
+            print "Found existing '%s' interpreter..." % new_terp_name
+            if overwrite_existing:
+                print "deleting %s from interpreter.json" %new_terp_name
+                del self.interpreter_json['interpreterSettings'][self._getTerpID(new_terp_name)]
+            else:
+                print "exiting program."
+                exit(1)
+
+        orig_terp_id = self._getTerpID(original_terp_name)
+
+        from copy import deepcopy
+        self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy(
+        self.interpreter_json['interpreterSettings'][orig_terp_id])
+        self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name
+        self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id
+        print "created new interpreter '%s' from interpreter '%s" % (new_terp_name, original_terp_name)
+
+    def _readTerpJson(self):
+        with open(self.interpreter_json_path) as f:
+            self.interpreter_json = json.load(f)
+
+    def _writeTerpJson(self):
+        with open(self.interpreter_json_path, 'wb') as f:
+            json.dump(self.interpreter_json, f, sort_keys=True, indent=4)
+
+    def _updateTerpProp(self, terpName, property, value):
+        terp_id = self._getTerpID(terpName)
+        self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value
+
+    def _addTerpDep(self, terpName="", dep="", exclusions=None):
+        if self.interpreter_json == {}:
+            print "no interpreter.json loaded, reading last one downloaded"
+            self._readTerpJson()
+        terp_id = self._getTerpID(terpName)
+        deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies']
+
+        dep_dict = {
+            u'groupArtifactVersion': dep,
+            u'local': False
+
+        }
+        if exclusions != None:
+            dep_dict["exclusions"] = exclusions
+        deps.append(dep_dict)
+
+        ## Remove Duplicate Dependencies
+        seen = set()
+        new_deps = list()
+        for d in deps:
+            t = d.items()
+            if t[0] not in seen:
+                seen.add(t[0])
+                new_deps.append(d)
+
+        self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps
+
+    def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"):
+
+        print "updating '%s' with Apache Mahout dependencies and settings" % terpName
+
+        terpDeps = ["%s/mahout-math-%s.jar" %  (mahout_home, mahout_version),
+                    "%s/mahout-math-scala_2.10-%s.jar" %  (mahout_home, mahout_version)]
+
+        if "spark" in terpName.lower():
+            configs = {
+                "spark.kryo.referenceTracking": "false",
+                "spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
+                "spark.kryoserializer.buffer": "32k",
+                "spark.kryoserializer.buffer.max": "600m",
+                "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
+            }
+            terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home,
mahout_version))
+            terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version))
+            terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version))
+
+        if "flink" in terpName.lower():
+            configs = {
+                "taskmanager.numberOfTaskSlots" : "12"
+            }
+            addlDeps = [
+                "%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version),
+                "%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version),
+                "com.google.guava:guava:14.0.1"
+                #"%s/guava-14.0.1.jar" % mahout_home  ## reuired in lib dir if running against
cluster
+            ]
+            for t in addlDeps:
+                terpDeps.append(t)
+
+        for k, v in configs.iteritems():
+            self._updateTerpProp(terpName, k, v)
+
+        for t in terpDeps:
+            self._addTerpDep(terpName, t)
+
+#######################################################################################################################
+# Need to be sure we know where Zeppelin Top directory is so we can edit conf files
+#
+#######################################################################################################################
+
+def valid_zeppelin_home(path):
+    return isfile(path + "/bin/zeppelin-daemon.sh")
+
+if args.zeppelin_home == None:
+    zeppelin_home = getcwd()
+    if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")):
+        print "we're in the zeppelin/bin"
+        zeppelin_home = "/".join(zeppelin_home.split("/")[:-1])
+    print "--zeppelin_home not specified, using %s" % zeppelin_home
+else:
+    zeppelin_home = args.zeppelin_home
+
+
+if not valid_zeppelin_home(zeppelin_home):
+    print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory
of the ZEPPELIN install" % zeppelin_home
+    exit(1)
+else:
+    print "ZEPPELIN_HOME validated"
+
+interpreter_json_path = zeppelin_home + "/conf/interpreter.json"
+
+if not isfile(interpreter_json_path):
+    print "interpreter.json doesn't exist. Checking weather Zeppelin is running."
+    status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home)
+    if status == 1:
+        print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never
been run (interpreter.json is created when Zeppelin is run)"
+        print "I'm going to try to start Zeppelin to create interpreter.json"
+        call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home)
+        from time import sleep
+        sleep(3)
+    else:
+        print "We're in the correct top-level directory, Zeppelin appears to be running,
but there is no 'interpreter.json'. \
+          \nThis is a confusing case.  Please try restarting Zeppelin, but if that doesn't
work reach out on the mailing list."
+
+if isfile(interpreter_json_path):
+    z = ZeppelinTerpWrangler(interpreter_json_path)
+else:
+    print "'interpreter.json' not found in %s/conf" % args.zeppelin_home
+    exit(1)
+
+#######################################################################################################################
+# If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME
+# Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir>
+#######################################################################################################################
+
+def download_mahout():
+    if args.force_download:
+        print "--force_download: OK, deleting existing tar if it exists."
+        call(["rm", "%s/%s" % (zeppelin_home, tar_name)])
+        return True
+    elif isfile("%s/%s" % (zeppelin_home, tar_name)):
+        print "%s found, skipping download" % tar_name
+        return False
+    elif args.mahout_home:
+        print "--mahout_home set, skipping download"
+        return False
+    else:
+        return True
+
+if download_mahout():
+    check_call(['wget', mahout_bin_url], cwd= zeppelin_home)
+    check_call(['tar', 'xzf', tar_name], cwd= zeppelin_home)
+
+
+
+if args.mahout_home:
+    mahout_home = args.mahout_home
+else:
+  mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2])
+
+#######################################################################################################################
+# Create new interpreters
+#######################################################################################################################
+
+z._readTerpJson()
+z.createTerp("spark", "sparkMahout", args.overwrite_existing)
+z.createTerp("flink", "flinkMahout", args.overwrite_existing)
+z.addMahoutConfig("sparkMahout", mahout_home, mahout_version)
+z.addMahoutConfig("flinkMahout", mahout_home, mahout_version)
+z._writeTerpJson()
+
+#######################################################################################################################
+# Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh
+# Create if doesn't exist.
+#######################################################################################################################
+
+mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home)
+
+zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home
+if isfile(zeppelin_env_sh_path):
+    with open(zeppelin_env_sh_path, 'rb') as f:
+        zeppelin_env_sh = f.readlines()
+    if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]):
+        print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending"
+    else:
+        print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
+        with open(zeppelin_env_sh_path, 'a') as f:
+            f.write(mahout_home_str)
+else:
+    print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
+    with open(zeppelin_env_sh_path, 'wb') as f:
+        f.write(mahout_home_str)
+
+
+#######################################################################################################################
+# You have to restart Apache Zeppelin for new terps to show up... do this for user unless
the specified otherwise
+#
+#######################################################################################################################
+if not args.restart_later:
+    print "restarting Apache Zeppelin to load new interpreters..."
+    check_call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home)
+else:
+    print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout
interpreters!!"
+
+#######################################################################################################################
+# Good bye
+#######################################################################################################################
+
+print "---------------------------------------------------------------------------------------------------------------"
+print "all done! Thanks for using Apache Mahout"
+print "bye"


Mime
View raw message