giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject git commit: updated refs/heads/trunk to b5b76c2
Date Fri, 22 Apr 2016 17:58:45 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk c9ab310db -> b5b76c284


GIRAPH-1054: Separate ThriftService from JobProgressTrackerService on the client

Summary:
* Moves the job tracker conf options into the GiraphConstants
* Factors out the static GiraphJob#startThriftServer and GiraphJob#stopThriftServer methods
from createJobProgressServer
* Allows adding other Thrift services to the ThriftServer

Test Plan: Tried on a cluster

Reviewers: maja.kabiljo, sergey.edunov

Reviewed By: sergey.edunov

Subscribers: sergey.edunov

Differential Revision: https://reviews.facebook.net/D57087


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b5b76c28
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b5b76c28
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b5b76c28

Branch: refs/heads/trunk
Commit: b5b76c28408e7401dc61fc2047182d971bbc2537
Parents: c9ab310
Author: Avery Ching <aching@fb.com>
Authored: Wed Apr 13 16:03:40 2016 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Fri Apr 22 10:57:20 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/conf/GiraphConstants.java |  6 +-
 .../RetryableJobProgressTrackerClient.java      |  5 +-
 .../apache/giraph/job/ClientThriftServer.java   | 92 ++++++++++++++++++++
 .../java/org/apache/giraph/job/GiraphJob.java   | 13 ++-
 .../apache/giraph/job/JobProgressTracker.java   | 13 +--
 .../giraph/job/JobProgressTrackerService.java   | 38 +-------
 6 files changed, 115 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index b7f0d5c..1e51101 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -17,9 +17,6 @@
  */
 package org.apache.giraph.conf;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.bsp.BspOutputFormat;
@@ -83,6 +80,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.OutputFormat;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 /**
  * Constants used all over Giraph for configuration.
  */

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index 9ce12ed..60cb586 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.job.ClientThriftServer;
 import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.log4j.Logger;
@@ -78,8 +79,8 @@ public class RetryableJobProgressTrackerClient
         ImmutableSet.<ThriftClientEventHandler>of());
     FramedClientConnector connector =
         new FramedClientConnector(new InetSocketAddress(
-            JOB_PROGRESS_SERVICE_HOST.get(conf),
-            JOB_PROGRESS_SERVICE_PORT.get(conf)));
+            ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
+            ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
     jobProgressTracker =
         clientManager.createClient(connector, JobProgressTracker.class).get();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java b/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java
new file mode 100644
index 0000000..ce7b5d9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import com.facebook.swift.codec.ThriftCodecManager;
+import com.facebook.swift.service.ThriftEventHandler;
+import com.facebook.swift.service.ThriftServer;
+import com.facebook.swift.service.ThriftServerConfig;
+import com.facebook.swift.service.ThriftServiceProcessor;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manages the life cycle of the Thrift server started on the client.
+ */
+public class ClientThriftServer {
+  /**
+   * The client can run a Thrift server (e.g. job progress service).
+   * This is the host of the Thrift server.
+   */
+  public static final StrConfOption CLIENT_THRIFT_SERVER_HOST =
+      new StrConfOption("giraph.client.thrift.server.host", null,
+          "Host on which the client Thrift server runs (if enabled)");
+  /**
+   * The client can run a Thrift server (e.g. job progress service).
+   * This is the port of the Thrift server.
+   */
+  public static final IntConfOption CLIENT_THRIFT_SERVER_PORT =
+      new IntConfOption("giraph.client.thrift.server.port", -1,
+          "Port on which the client Thrift server runs (if enabled)");
+
+  /** Thrift server that is intended to run on the client */
+  private final ThriftServer clientThriftServer;
+
+  /**
+   * Create and start the Thrift server.
+   *
+   * @param conf Giraph conf to set the host and ports for.
+   * @param services Services to start
+   */
+  public ClientThriftServer(GiraphConfiguration conf,
+                            List<?> services) {
+    checkNotNull(conf, "conf is null");
+    checkNotNull(services, "services is null");
+
+    ThriftServiceProcessor processor =
+        new ThriftServiceProcessor(new ThriftCodecManager(),
+                                   new ArrayList<ThriftEventHandler>(),
+                                   services);
+    clientThriftServer =
+        new ThriftServer(processor, new ThriftServerConfig());
+    clientThriftServer.start();
+    try {
+      CLIENT_THRIFT_SERVER_HOST.set(
+          conf,
+          conf.getLocalHostname());
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException("Unable to get host information", e);
+    }
+    CLIENT_THRIFT_SERVER_PORT.set(conf, clientThriftServer.getPort());
+  }
+
+  /**
+   * Stop the Thrift server.
+   */
+  public void stopThriftServer() {
+    this.clientThriftServer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 8792e59..90a73c6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.job;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -240,7 +241,13 @@ public class GiraphJob {
       GiraphJobObserver jobObserver = conf.getJobObserver();
 
       JobProgressTrackerService jobProgressTrackerService =
-          JobProgressTrackerService.createJobProgressServer(conf, jobObserver);
+          JobProgressTrackerService.createJobProgressTrackerService(
+              conf, jobObserver);
+      ClientThriftServer clientThriftServer = null;
+      if (jobProgressTrackerService != null) {
+        clientThriftServer = new ClientThriftServer(
+            conf, ImmutableList.of(jobProgressTrackerService));
+      }
 
       tryCount++;
       Job submittedJob = new Job(conf, jobName);
@@ -271,6 +278,10 @@ public class GiraphJob {
       if (jobProgressTrackerService != null) {
         jobProgressTrackerService.stop(passed);
       }
+      if (clientThriftServer != null) {
+        clientThriftServer.stopThriftServer();
+      }
+
       jobObserver.jobFinished(submittedJob, passed);
 
       if (!passed) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 4da5450..3041d08 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -18,26 +18,15 @@
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.worker.WorkerProgress;
-
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
+import org.apache.giraph.worker.WorkerProgress;
 
 /**
  * Interface for job progress tracker on job client
  */
 @ThriftService
 public interface JobProgressTracker {
-  /** Host on which job progress service runs */
-  StrConfOption JOB_PROGRESS_SERVICE_HOST =
-      new StrConfOption("giraph.jobProgressServiceHost", null,
-          "Host on which job progress service runs");
-  /** Port which job progress service uses */
-  IntConfOption JOB_PROGRESS_SERVICE_PORT =
-      new IntConfOption("giraph.jobProgressServicePort", -1,
-          "Port which job progress service uses");
 
   /** Notify JobProgressTracker that mapper started */
   @ThriftMethod

http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
index b08bf3e..c0189c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -24,15 +24,7 @@ import org.apache.giraph.worker.WorkerProgress;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
-import com.facebook.swift.codec.ThriftCodecManager;
-import com.facebook.swift.service.ThriftEventHandler;
-import com.facebook.swift.service.ThriftServer;
-import com.facebook.swift.service.ThriftServerConfig;
-import com.facebook.swift.service.ThriftServiceProcessor;
-
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,8 +46,6 @@ public class JobProgressTrackerService implements JobProgressTracker {
   private Thread writerThread;
   /** Whether application is finished */
   private volatile boolean finished = false;
-  /** Server which uses this service */
-  private ThriftServer server;
   /** Number of mappers which the job got */
   private int mappersStarted;
   /** Last time number of mappers started was logged */
@@ -208,7 +198,6 @@ public class JobProgressTrackerService implements JobProgressTracker {
   public void stop(boolean succeeded) {
     finished = true;
     writerThread.interrupt();
-    server.close();
     if (LOG.isInfoEnabled()) {
       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
           ", cleaning up...");
@@ -216,37 +205,18 @@ public class JobProgressTrackerService implements JobProgressTracker
{
   }
 
   /**
-   * Create job progress server on job client, and update configuration with
-   * its hostname and port so mappers would know what to connect to. Returns
-   * null if progress shouldn't be tracked
+   * Create job progress server on job client if enabled in configuration.
    *
    * @param conf Configuration
    * @param jobObserver Giraph job callbacks
    * @return JobProgressTrackerService
    */
-  public static JobProgressTrackerService createJobProgressServer(
+  public static JobProgressTrackerService createJobProgressTrackerService(
       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
     if (!conf.trackJobProgressOnClient()) {
       return null;
     }
-    try {
-      JobProgressTrackerService service =
-          new JobProgressTrackerService(conf, jobObserver);
-      ThriftServiceProcessor processor =
-          new ThriftServiceProcessor(new ThriftCodecManager(),
-              new ArrayList<ThriftEventHandler>(), service);
-      service.server = new ThriftServer(processor, new ThriftServerConfig());
-      service.server.start();
-      JOB_PROGRESS_SERVICE_HOST.set(conf,
-          InetAddress.getLocalHost().getHostName());
-      JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort());
-      return service;
-      // CHECKSTYLE: stop IllegalCatch
-    } catch (Exception e) {
-      // CHECKSTYLE: resume IllegalCatch
-      LOG.warn("Exception occurred while trying to create " +
-          "JobProgressTrackerService - not using progress reporting", e);
-      return null;
-    }
+
+    return new JobProgressTrackerService(conf, jobObserver);
   }
 }


Mime
View raw message