giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 093e819
Date Thu, 07 Apr 2016 17:49:25 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 24bed1a9b -> 093e81995


GIRAPH-1050: Add MapperObserver

Summary: Add MapperObserver which will be called once per mapper before anything else happens.

Test Plan: Ran a job with MapperObserver set, verified it's called at the right time

Differential Revision: https://reviews.facebook.net/D56373


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/093e8199
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/093e8199
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/093e8199

Branch: refs/heads/trunk
Commit: 093e81995e94ee6dbfe6278f04d59a29cd54ac8b
Parents: 24bed1a
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Thu Apr 7 09:40:27 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Thu Apr 7 10:49:12 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/conf/GiraphConfiguration.java | 20 +++++++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |  5 ++++
 .../ImmutableClassesGiraphConfiguration.java    | 15 ++++++++++
 .../apache/giraph/graph/GraphTaskManager.java   | 13 +++++++++
 .../org/apache/giraph/graph/MapperObserver.java | 30 ++++++++++++++++++++
 5 files changed, 83 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 6b00645..78bd5ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -33,6 +33,7 @@ import org.apache.giraph.edge.ReuseObjectsOutEdges;
 import org.apache.giraph.factories.ComputationFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.MapperObserver;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
@@ -313,6 +314,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Add a MapperObserver class (optional)
+   *
+   * @param mapperObserverClass MapperObserver class to add.
+   */
+  public final void addMapperObserverClass(
+      Class<? extends MapperObserver> mapperObserverClass) {
+    MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
+  }
+
+  /**
    * Get job observer class
    *
    * @return GiraphJobObserver class set.
@@ -687,6 +698,15 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get array of MapperObserver classes set in configuration.
+   *
+   * @return array of MapperObserver classes.
+   */
+  public Class<? extends MapperObserver>[] getMapperObserverClasses() {
+    return MAPPER_OBSERVER_CLASSES.getArray(this);
+  }
+
+  /**
    * Whether to track, print, and aggregate metrics.
    *
    * @return true if metrics are enabled, false otherwise (default)

http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4787d37..b7f0d5c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -48,6 +48,7 @@ import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueCombiner;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.MapperObserver;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
@@ -202,6 +203,10 @@ public interface GiraphConstants {
   ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES =
       ClassConfOption.create("giraph.worker.observers", null,
           WorkerObserver.class, "Classes for Worker Observer - optional");
+  /** Classes for Mapper Observer - optional */
+  ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES =
+      ClassConfOption.create("giraph.mapper.observers", null,
+          MapperObserver.class, "Classes for Mapper Observer - optional");
   /** Message combiner class - optional */
   ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
       ClassConfOption.create("giraph.messageCombinerClass", null,

http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 38bf101..130c581 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.giraph.factories.VertexIdFactory;
 import org.apache.giraph.factories.VertexValueFactory;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Language;
+import org.apache.giraph.graph.MapperObserver;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
@@ -756,6 +757,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create array of MapperObservers.
+   *
+   * @return Instantiated array of MapperObservers.
+   */
+  public MapperObserver[] createMapperObservers() {
+    Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
+    MapperObserver[] objects = new MapperObserver[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+    }
+    return objects;
+  }
+
+  /**
    * Create job observer
    *
    * @return GiraphJobObserver set in configuration.

http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 62a87de..19ac615 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -160,6 +160,8 @@ end[PURE_YARN]*/
   private final Mapper<?, ?, ?, ?>.Context context;
   /** is this GraphTaskManager the master? */
   private boolean isMaster;
+  /** Mapper observers */
+  private MapperObserver[] mapperObservers;
 
   /**
    * Default constructor for GiraphTaskManager.
@@ -206,6 +208,7 @@ end[PURE_YARN]*/
     context.setStatus("setup: Beginning worker setup.");
     Configuration hadoopConf = context.getConfiguration();
     conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
+    setupMapperObservers();
     initializeJobProgressTracker();
     // Write user's graph types (I,V,E,M) back to configuration parameters so
     // that they are set for quicker access later. These types are often
@@ -872,6 +875,16 @@ end[PURE_YARN]*/
   }
 
   /**
+   * Setup mapper observers
+   */
+  public void setupMapperObservers() {
+    mapperObservers = conf.createMapperObservers();
+    for (MapperObserver mapperObserver : mapperObservers) {
+      mapperObserver.setup();
+    }
+  }
+
+  /**
    * Executes preLoad() on worker observers.
    */
   private void preLoadOnWorkerObservers() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
new file mode 100644
index 0000000..cfbb421
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Mapper observer
+ */
+public interface MapperObserver {
+  /**
+   * Setup mapper. Called in the beginning of mapper setup, the only thing
+   * which happens before is configuration preparation.
+   */
+  void setup();
+}


Mime
View raw message