giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject git commit: updated refs/heads/trunk to efe6bf3
Date Wed, 10 Jul 2013 12:18:54 GMT
Updated Branches:
  refs/heads/trunk 4b01c88e4 -> efe6bf3d6


GIRAPH-709: More flexible Jython script loading (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/efe6bf3d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/efe6bf3d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/efe6bf3d

Branch: refs/heads/trunk
Commit: efe6bf3d6ece4bedfd0ab0ba649bcc558d1b0c80
Parents: 4b01c88
Author: Nitay Joffe <nitay@apache.org>
Authored: Wed Jul 10 07:52:47 2013 -0400
Committer: Nitay Joffe <nitay@apache.org>
Committed: Wed Jul 10 08:07:19 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/benchmark/PageRankBenchmark.java     |  15 +-
 .../giraph/conf/JsonStringConfOption.java       | 151 +++++++++++
 .../apache/giraph/graph/GraphTaskManager.java   |   7 +-
 .../org/apache/giraph/jython/DeployType.java    |  28 --
 .../giraph/jython/JythonComputationFactory.java | 131 +++-------
 .../org/apache/giraph/jython/JythonUtils.java   |  38 ++-
 .../org/apache/giraph/scripting/DeployType.java |  28 ++
 .../apache/giraph/scripting/DeployedScript.java |  96 +++++++
 .../apache/giraph/scripting/ScriptLoader.java   | 253 +++++++++++++++++++
 .../apache/giraph/scripting/package-info.java   |  21 ++
 .../apache/giraph/utils/ConfigurationUtils.java |   8 +-
 .../org/apache/giraph/jython/TestJython.java    |  13 +-
 13 files changed, 636 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6b792b8..b1f44c2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-709: More flexible Jython script loading (nitay)
+
   GIRAPH-708: Factories for creation of all IVEM types (nitay)
 
   GIRAPH-710: Define zookeeper version in a property to allow build time

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 413107d..acc1c46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -23,9 +23,11 @@ import org.apache.giraph.combiner.FloatSumCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.IntNullArrayEdges;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat;
-import org.apache.giraph.jython.DeployType;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.jython.JythonUtils;
 import org.apache.giraph.utils.DistributedCacheUtils;
 import org.apache.giraph.utils.ReflectionUtils;
@@ -36,8 +38,6 @@ import com.google.common.collect.Sets;
 
 import java.util.Set;
 
-import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
-
 /**
  * Benchmark for {@link PageRankComputation}
  */
@@ -56,19 +56,22 @@ public class PageRankBenchmark extends GiraphBenchmark {
     if (BenchmarkOption.JYTHON.optionTurnedOn(cmd)) {
       GiraphTypes types = new GiraphTypes();
       types.inferFrom(PageRankComputation.class);
+
       String script;
+      DeployType deployType;
       if (BenchmarkOption.SCRIPT_PATH.optionTurnedOn(cmd)) {
-        JYTHON_DEPLOY_TYPE.set(conf, DeployType.DISTRIBUTED_CACHE);
+        deployType = DeployType.DISTRIBUTED_CACHE;
         String path = BenchmarkOption.SCRIPT_PATH.getOptionValue(cmd);
         Path hadoopPath = new Path(path);
         Path remotePath = DistributedCacheUtils.copyAndAdd(hadoopPath, conf);
         script = remotePath.toString();
       } else {
-        JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+        deployType = DeployType.RESOURCE;
         script = ReflectionUtils.getPackagePath(this) + "/page-rank.py";
       }
+      ScriptLoader.setScriptsToLoad(conf, script, deployType, Language.JYTHON);
       types.writeIfUnset(conf);
-      JythonUtils.init(conf, script, "PageRank");
+      JythonUtils.init(conf, "PageRank");
     } else {
       conf.setComputationClass(PageRankComputation.class);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java
new file mode 100644
index 0000000..28f9388
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+
+/**
+ * JSON String configuration option
+ */
+public class JsonStringConfOption extends AbstractConfOption {
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(JsonStringConfOption.class);
+
+  /**
+   * Constructor
+   *
+   * @param key String key name
+   * @param description String description of option
+   */
+  public JsonStringConfOption(String key, String description) {
+    super(key, description);
+  }
+
+  /**
+   * Set JSON value
+   *
+   * @param conf Configuration
+   * @param value Json value
+   */
+  public void set(Configuration conf, Object value) {
+    ObjectMapper mapper = new ObjectMapper();
+    String jsonStr;
+    try {
+      jsonStr = mapper.writeValueAsString(value);
+      conf.set(getKey(), jsonStr);
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to set " + getKey() +
+          " with json value from " + value);
+    }
+  }
+
+  /**
+   * Get raw JSON string
+   *
+   * @param conf Configuration
+   * @return raw JSON string value
+   */
+  public String getRaw(Configuration conf) {
+    return conf.get(getKey());
+  }
+
+  /**
+   * Get JSON value
+   *
+   * @param <T> JSON type
+   * @param conf Configuration
+   * @param klass Class to read into
+   * @return JSON value
+   */
+  public <T> T get(Configuration conf, Class<T> klass) {
+    String jsonStr = getRaw(conf);
+    T value = null;
+    if (jsonStr != null) {
+      ObjectMapper mapper = new ObjectMapper();
+      try {
+        value = mapper.readValue(jsonStr, klass);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to read json from key " +
+            getKey() + " with class " + klass);
+      }
+    }
+    return value;
+  }
+
+  /**
+   * Get JSON value
+   *
+   * @param <T> JSON type
+   * @param conf Configuration
+   * @param typeReference TypeReference for JSON type
+   * @return JSON value
+   */
+  public <T> T get(Configuration conf, TypeReference<T> typeReference) {
+    String jsonStr = getRaw(conf);
+    T value = null;
+    if (jsonStr != null) {
+      ObjectMapper mapper = new ObjectMapper();
+      try {
+        value = mapper.readValue(jsonStr, typeReference);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to read json from key " +
+            getKey() + " with class " + typeReference);
+      }
+    }
+    return value;
+  }
+
+  /**
+   * Get JSON value, or default if not present
+   *
+   * @param <T> JSON type
+   * @param klass Class to read into
+   * @param conf Configuration
+   * @param defaultValue Default value if not found
+   * @return JSON value
+   */
+  public <T> T getWithDefault(Configuration conf, Class<T> klass,
+      T defaultValue) {
+    if (contains(conf)) {
+      return get(conf, klass);
+    } else {
+      return defaultValue;
+    }
+  }
+
+  @Override
+  public String getDefaultValueStr() {
+    return "null";
+  }
+
+  @Override
+  public boolean isDefaultValue(Configuration conf) {
+    return !contains(conf);
+  }
+
+  @Override
+  public ConfOptionType getType() {
+    return ConfOptionType.STRING;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e81c7c4..b0982b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -24,6 +24,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.master.BspServiceMaster;
 import org.apache.giraph.master.MasterAggregatorUsage;
 import org.apache.giraph.master.MasterThread;
@@ -190,10 +191,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     initializeAndConfigureLogging();
     // init the metrics objects
     setupAndInitializeGiraphMetrics();
-    // One time setup for computation factory
-    conf.createComputationFactory().initialize(conf);
     // Check input
     checkInput();
+    // Load any scripts that were deployed
+    ScriptLoader.loadScripts(conf);
+    // One time setup for computation factory
+    conf.createComputationFactory().initialize(conf);
     // Do some task setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
     locateZookeeperClasspath(zkPathList);

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
deleted file mode 100644
index d916119..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.jython;
-
-/**
- * Type of deployment for a file
- */
-public enum DeployType {
-  /** Resource packaged with jar */
-  RESOURCE,
-  /** Hadoop's Distributed Cache */
-  DISTRIBUTED_CACHE
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
index b714e91..80e4e76 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
@@ -17,123 +17,70 @@
  */
 package org.apache.giraph.jython;
 
-import org.apache.giraph.conf.EnumConfOption;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.factories.ComputationFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.log4j.Logger;
 import org.python.core.PyObject;
 import org.python.util.PythonInterpreter;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.giraph.scripting.ScriptLoader.SCRIPTS_TO_LOAD;
 
 /**
  * Factory for creating Jython Computation from python scripts
  */
 public class JythonComputationFactory implements ComputationFactory {
-  /** Type of script path */
-  public static final EnumConfOption<DeployType> JYTHON_DEPLOY_TYPE =
-      EnumConfOption.create("giraph.jython.deploy.type",
-          DeployType.class, DeployType.DISTRIBUTED_CACHE,
-          "Type of script path");
-  /** Path to Jython script */
-  public static final StrConfOption JYTHON_SCRIPT_PATH =
-      new StrConfOption("giraph.jython.path", "_script_not_set_",
-          "Path to Jython script");
   /** Name of Computation class in Jython script */
-  public static final StrConfOption JYTHON_COMPUTATION_CLASS =
+  public static final StrConfOption JYTHON_COMPUTATION_CLASS_NAME =
       new StrConfOption("giraph.jython.class", "_computation_class_not_set_",
           "Name of Computation class in Jython script");
 
+  /** The Jython compute function, cached here for fast access */
+  private static volatile PyObject JYTHON_COMPUTATION_MODULE;
+
   /** Logger */
   private static final Logger LOG = Logger.getLogger(JythonUtils.class);
 
-  @Override
-  public void initialize(ImmutableClassesGiraphConfiguration conf) {
-    String scriptPath = JYTHON_SCRIPT_PATH.get(conf);
-    InputStream pythonStream = getPythonScriptStream(conf, scriptPath);
-    try {
-      PythonInterpreter interpreter = new PythonInterpreter();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("initComputation: Jython loading script " + scriptPath);
-      }
-      interpreter.execfile(pythonStream);
-
-      String className = computationName(conf);
-      PyObject pyComputationModule = interpreter.get(className);
-
-      JythonUtils.setPythonComputationModule(pyComputationModule);
-    } finally {
-      Closeables.closeQuietly(pythonStream);
-    }
+  /**
+   * Set static python computation module stored
+   *
+   * @param mod python computation module
+   */
+  private static void setPythonComputationModule(PyObject mod) {
+    JYTHON_COMPUTATION_MODULE = mod;
   }
 
   /**
-   * Get an {@link InputStream} for the jython script.
+   * Get python computation module stored
    *
-   * @param conf Configuration
-   * @param path script path
-   * @return {@link InputStream} for reading script
+   * @return python computation module
    */
-  private InputStream getPythonScriptStream(Configuration conf,
-      String path) {
-    InputStream stream = null;
-    DeployType deployType = JYTHON_DEPLOY_TYPE.get(conf);
-    switch (deployType) {
-    case RESOURCE:
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getPythonScriptStream: Reading Jython Computation " +
-            "from resource at " + path);
-      }
-      stream = getClass().getResourceAsStream(path);
-      if (stream == null) {
-        throw new IllegalStateException("getPythonScriptStream: Failed to " +
-            "open Jython script from resource at " + path);
-      }
-      break;
-    case DISTRIBUTED_CACHE:
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getPythonScriptStream: Reading Jython Computation " +
-            "from DistributedCache at " + path);
-      }
-      Optional<Path> localPath = getLocalCacheFile(conf, path);
-      if (!localPath.isPresent()) {
-        throw new IllegalStateException("getPythonScriptStream: Failed to " +
-            "find Jython script in local DistributedCache matching " + path);
-      }
-      String pathStr = localPath.get().toString();
-      try {
-        stream = new BufferedInputStream(new FileInputStream(pathStr));
-      } catch (IOException e) {
-        throw new IllegalStateException("getPythonScriptStream: Failed open " +
-            "Jython script from DistributedCache at " + localPath);
-      }
-      break;
-    default:
-      throw new IllegalArgumentException("getPythonScriptStream: Unknown " +
-          "Jython script deployment type: " + deployType);
-    }
-    return stream;
+  private static PyObject getPythonComputationModule() {
+    return JYTHON_COMPUTATION_MODULE;
+  }
+
+  @Override
+  public void initialize(ImmutableClassesGiraphConfiguration conf) {
+    PythonInterpreter interpreter = JythonUtils.getInterpreter();
+    String className = computationName(conf);
+    PyObject pyComputationModule = interpreter.get(className);
+    checkNotNull(pyComputationModule,
+        "Could not find Jython Computation class " + className +
+        " in loaded scripts: " + ScriptLoader.getLoadedScripts());
+    setPythonComputationModule(pyComputationModule);
   }
 
   @Override
   public Computation createComputation(
       ImmutableClassesGiraphConfiguration conf) {
-    PyObject pyComputationModule = JythonUtils.getPythonComputationModule();
-    Preconditions.checkNotNull(pyComputationModule);
+    PyObject pyComputationModule = getPythonComputationModule();
+    checkNotNull(pyComputationModule,
+        "Jython Computation class not set in loaded scripts: " +
+            ScriptLoader.getLoadedScripts());
 
     PyObject pyComputationObj = pyComputationModule.__call__();
     Object computationObj = pyComputationObj.__tojava__(Computation.class);
@@ -148,18 +95,18 @@ public class JythonComputationFactory implements ComputationFactory {
 
   @Override
   public void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
-    if (JYTHON_SCRIPT_PATH.isDefaultValue(conf)) {
+    if (SCRIPTS_TO_LOAD.isDefaultValue(conf)) {
       throw new IllegalStateException("checkConfiguration: " +
-          JYTHON_SCRIPT_PATH.getKey() + " not set in configuration");
+          SCRIPTS_TO_LOAD.getKey() + " not set in configuration");
     }
-    if (JYTHON_COMPUTATION_CLASS.isDefaultValue(conf)) {
+    if (JYTHON_COMPUTATION_CLASS_NAME.isDefaultValue(conf)) {
       throw new IllegalStateException("checkConfiguration: " +
-          JYTHON_COMPUTATION_CLASS.getKey() + " not set in configuration");
+          JYTHON_COMPUTATION_CLASS_NAME.getKey() + " not set in configuration");
     }
   }
 
   @Override
   public String computationName(GiraphConfiguration conf) {
-    return JYTHON_COMPUTATION_CLASS.get(conf);
+    return JYTHON_COMPUTATION_CLASS_NAME.get(conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
index 77040e3..e747e84 100644
--- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
@@ -19,53 +19,45 @@ package org.apache.giraph.jython;
 
 import org.apache.giraph.graph.Language;
 import org.apache.hadoop.conf.Configuration;
-import org.python.core.PyObject;
+import org.python.util.PythonInterpreter;
 
 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_FACTORY_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
-import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS;
-import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_SCRIPT_PATH;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS_NAME;
 
 /**
  * Helpers for running jobs with Jython.
  */
 public class JythonUtils {
-  /** The Jython compute function, cached here for fast access */
-  private static volatile PyObject JYTHON_COMPUTATION_MODULE;
+  /**
+   * The Jython interpreter. Cached here for fast access. We use a singleton
+   * for this so that we can parse all of the Jython scripts once at startup
+   * and then have their data loaded for the rest of the job.
+   */
+  private static final PythonInterpreter INTERPRETER =
+      new PythonInterpreter();
 
   /** Don't construct */
   private JythonUtils() { }
 
   /**
-   * Set static python computation module stored
-   *
-   * @param mod python computation module
-   */
-  public static void setPythonComputationModule(PyObject mod) {
-    JYTHON_COMPUTATION_MODULE = mod;
-  }
-
-  /**
-   * Get python computation module stored
+   * Get Jython interpreter
    *
-   * @return python computation module
+   * @return interpreter
    */
-  public static PyObject getPythonComputationModule() {
-    return JYTHON_COMPUTATION_MODULE;
+  public static PythonInterpreter getInterpreter() {
+    return INTERPRETER;
   }
 
   /**
    * Sets up the Configuration for using Jython
    *
    * @param conf Configuration to se
-   * @param scriptPath Path to Jython script (resource or distributed cache)
    * @param klassName Class name of Jython Computation
    */
-  public static void init(Configuration conf, String scriptPath,
-      String klassName) {
+  public static void init(Configuration conf, String klassName) {
     COMPUTATION_LANGUAGE.set(conf, Language.JYTHON);
     COMPUTATION_FACTORY_CLASS.set(conf, JythonComputationFactory.class);
-    JYTHON_SCRIPT_PATH.set(conf, scriptPath);
-    JYTHON_COMPUTATION_CLASS.set(conf, klassName);
+    JYTHON_COMPUTATION_CLASS_NAME.set(conf, klassName);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java
new file mode 100644
index 0000000..fc3054a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.scripting;
+
+/**
+ * Type of deployment for a file
+ */
+public enum DeployType {
+  /** Resource packaged with jar */
+  RESOURCE,
+  /** Hadoop's Distributed Cache */
+  DISTRIBUTED_CACHE
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java
new file mode 100644
index 0000000..c4353d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.scripting;
+
+import org.apache.giraph.graph.Language;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.base.Objects;
+
+/**
+ * A script that was deployed to the cluster.
+ */
+public class DeployedScript {
+  /** How the script was deployed */
+  @JsonProperty
+  private final DeployType deployType;
+  /** Path to the script */
+  @JsonProperty
+  private final String path;
+  /** Programming language the script is written in */
+  @JsonProperty
+  private final Language language;
+
+  /**
+   * Constructor
+   *
+   * @param path String path to resource
+   * @param deployType deployment type
+   * @param language programming language
+   */
+  @JsonCreator
+  public DeployedScript(
+      @JsonProperty("path") String path,
+      @JsonProperty("deployType") DeployType deployType,
+      @JsonProperty("language") Language language) {
+    this.path = path;
+    this.deployType = deployType;
+    this.language = language;
+  }
+
+  public DeployType getDeployType() {
+    return deployType;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public Language getLanguage() {
+    return language;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(path, deployType, language);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof DeployedScript) {
+      DeployedScript other = (DeployedScript) obj;
+      return Objects.equal(path, other.path) &&
+          Objects.equal(deployType, other.deployType) &&
+          Objects.equal(language, other.language);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("path", path)
+        .add("deployType", deployType)
+        .add("language", language)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
new file mode 100644
index 0000000..bf3e152
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.scripting;
+
+import org.apache.giraph.conf.JsonStringConfOption;
+import org.apache.giraph.graph.Language;
+import org.apache.giraph.jython.JythonUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
+
+/**
+ * Loads scripts written by user in other languages, for example Jython.
+ */
+public class ScriptLoader {
+  /** Option for scripts to load on workers */
+  public static final JsonStringConfOption SCRIPTS_TO_LOAD =
+      new JsonStringConfOption("giraph.scripts.to.load",
+          "Scripts to load on workers");
+
+  /** Scripts that were loaded */
+  private static final List<DeployedScript> LOADED_SCRIPTS =
+      Lists.newArrayList();
+
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
+
+  /** Don't construct */
+  private ScriptLoader() { }
+
+  /**
+   * Deploy a script
+   *
+   * @param conf {@link Configuration}
+   * @param scriptPath Path to script
+   * @param deployType type of deployment
+   * @param language programming language
+   */
+  public static void setScriptsToLoad(Configuration conf,
+      String scriptPath, DeployType deployType, Language language) {
+    DeployedScript deployedScript = new DeployedScript(scriptPath,
+        deployType, language);
+    setScriptsToLoad(conf, deployedScript);
+  }
+
+  /**
+   * Deploy pair of scripts
+   *
+   * @param conf {@link Configuration}
+   * @param script1 Path to script
+   * @param deployType1 type of deployment
+   * @param language1 programming language
+   * @param script2 Path to script
+   * @param deployType2 type of deployment
+   * @param language2 programming language
+   */
+  public static void setScriptsToLoad(Configuration conf,
+      String script1, DeployType deployType1, Language language1,
+      String script2, DeployType deployType2, Language language2) {
+    DeployedScript deployedScript1 = new DeployedScript(script1,
+        deployType1, language1);
+    DeployedScript deployedScript2 = new DeployedScript(script2,
+        deployType2, language2);
+    setScriptsToLoad(conf, deployedScript1, deployedScript2);
+  }
+
+  /**
+   * Deploy scripts
+   *
+   * @param conf Configuration
+   * @param scripts the scripts to deploy
+   */
+  public static void setScriptsToLoad(Configuration conf,
+      DeployedScript... scripts) {
+    List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
+    SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
+  }
+
+  /**
+   * Add a script to load on workers
+   *
+   * @param conf {@link Configuration}
+   * @param script  Path to script
+   * @param deployType type of deployment
+   * @param language programming language
+   */
+  public static void addScriptToLoad(Configuration conf,
+      String script, DeployType deployType, Language language) {
+    addScriptToLoad(conf, new DeployedScript(script, deployType, language));
+  }
+
+  /**
+   * Add a script to load on workers
+   *
+   * @param conf {@link Configuration}
+   * @param script the script to load
+   */
+  public static void addScriptToLoad(Configuration conf,
+      DeployedScript script) {
+    List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
+    if (scriptsToLoad == null) {
+      scriptsToLoad = Lists.<DeployedScript>newArrayList();
+    }
+    scriptsToLoad.add(script);
+    SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
+  }
+
+  /**
+   * Get the list of scripts to load on workers
+   *
+   * @param conf {@link Configuration}
+   * @return list of {@link DeployedScript}s
+   */
+  public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
+    TypeReference<List<DeployedScript>> jsonType =
+        new TypeReference<List<DeployedScript>>() { };
+    return SCRIPTS_TO_LOAD.get(conf, jsonType);
+  }
+
+  /**
+   * Load all the scripts deployed in Configuration
+   *
+   * @param conf Configuration
+   */
+  public static void loadScripts(Configuration conf) {
+    List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
+    if (deployedScripts == null) {
+      return;
+    }
+    for (DeployedScript deployedScript : deployedScripts) {
+      loadScript(conf, deployedScript);
+    }
+  }
+
+  /**
+   * Load a single deployed script
+   *
+   * @param conf Configuration
+   * @param deployedScript the deployed script
+   */
+  public static void loadScript(Configuration conf,
+      DeployedScript deployedScript) {
+    InputStream stream = openScriptInputStream(conf, deployedScript);
+    switch (deployedScript.getLanguage()) {
+    case JYTHON:
+      loadJythonScript(stream);
+      break;
+    default:
+      LOG.error("Don't know how to load script " + deployedScript);
+      throw new IllegalStateException("Don't know how to load script " +
+          deployedScript);
+    }
+
+    LOADED_SCRIPTS.add(deployedScript);
+    Closeables.closeQuietly(stream);
+  }
+
+  /**
+   * Load a Jython deployed script
+   *
+   * @param stream InputStream with Jython code to load
+   */
+  private static void loadJythonScript(InputStream stream) {
+    JythonUtils.getInterpreter().execfile(stream);
+  }
+
+  /**
+   * Get list of scripts already loaded.
+   *
+   * @return list of loaded scripts
+   */
+  public static List<DeployedScript> getLoadedScripts() {
+    return LOADED_SCRIPTS;
+  }
+
+  /**
+   * Get an {@link java.io.InputStream} for the deployed script.
+   *
+   * @param conf Configuration
+   * @param deployedScript the deployed script
+   * @return {@link java.io.InputStream} for reading script
+   */
+  private static InputStream openScriptInputStream(Configuration conf,
+      DeployedScript deployedScript) {
+    DeployType deployType = deployedScript.getDeployType();
+    String path = deployedScript.getPath();
+
+    InputStream stream;
+    switch (deployType) {
+    case RESOURCE:
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getScriptStream: Reading script from resource at " +
+            deployedScript.getPath());
+      }
+      stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
+      if (stream == null) {
+        throw new IllegalStateException("getScriptStream: Failed to " +
+            "open script from resource at " + path);
+      }
+      break;
+    case DISTRIBUTED_CACHE:
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getScriptStream: Reading script from DistributedCache at " +
+            path);
+      }
+      Optional<Path> localPath = getLocalCacheFile(conf, path);
+      if (!localPath.isPresent()) {
+        throw new IllegalStateException("getScriptStream: Failed to " +
+            "find script in local DistributedCache matching " + path);
+      }
+      String pathStr = localPath.get().toString();
+      try {
+        stream = new BufferedInputStream(new FileInputStream(pathStr));
+      } catch (IOException e) {
+        throw new IllegalStateException("getScriptStream: Failed open " +
+            "script from DistributedCache at " + localPath);
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("getScriptStream: Unknown " +
+          "script deployment type: " + deployType);
+    }
+    return stream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java b/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java
new file mode 100644
index 0000000..fbe077c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Scripting with Giraph.
+ */
+package org.apache.giraph.scripting;

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 6b89403..0424ff5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -33,11 +33,14 @@ import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.factories.VertexValueFactory;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.job.GiraphConfigurationValidator;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.jython.JythonUtils;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.partition.Partition;
@@ -433,8 +436,11 @@ public final class ConfigurationUtils {
     Path path = new Path(scriptPath);
     Path remotePath = DistributedCacheUtils.copyAndAdd(path, conf);
 
+    ScriptLoader.setScriptsToLoad(conf, remotePath.toString(),
+        DeployType.DISTRIBUTED_CACHE, Language.JYTHON);
+
     GiraphTypes.readFrom(conf).writeIfUnset(conf);
-    JythonUtils.init(conf, remotePath.toString(), jythonClass);
+    JythonUtils.init(conf, jythonClass);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
index 58f25a6..b5110bb 100644
--- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
+++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
@@ -20,12 +20,16 @@ package org.apache.giraph.jython;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.scripting.DeployType;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
+import org.python.core.PyClass;
 import org.python.core.PyDictionary;
 import org.python.core.PyInteger;
 import org.python.core.PyList;
@@ -36,7 +40,6 @@ import com.google.common.collect.Maps;
 
 import java.util.Map;
 
-import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -66,6 +69,8 @@ public class TestJython {
     interpreter.exec(jython);
 
     PyObject fooClass = interpreter.get("Foo");
+    assertTrue(fooClass instanceof PyClass);
+
     PyObject getMapFunc = interpreter.get("get_map");
     PyObject getListFunc = interpreter.get("get_list");
     PyObject getIValFunc = interpreter.get("get_ival");
@@ -109,8 +114,10 @@ public class TestJython {
     GiraphTypes types = new GiraphTypes(IntWritable.class, IntWritable.class,
         NullWritable.class, NullWritable.class, NullWritable.class);
     types.writeIfUnset(conf);
-    JythonUtils.init(conf, "count-edges.py", "CountEdges");
-    JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+    ScriptLoader.setScriptsToLoad(conf,
+        "org/apache/giraph/jython/count-edges.py",
+        DeployType.RESOURCE, Language.JYTHON);
+    JythonUtils.init(conf, "CountEdges");
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);


Mime
View raw message