tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1304. Abstract out client interactions with YARN. Contributed by Jonathan Eagles.
Date Mon, 28 Jul 2014 05:00:00 GMT
Repository: tez
Updated Branches:
  refs/heads/master 692e2a0e7 -> c0d59139c


TEZ-1304. Abstract out client interactions with YARN. Contributed by
Jonathan Eagles.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c0d59139
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c0d59139
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c0d59139

Branch: refs/heads/master
Commit: c0d59139c0c88a38e12d5b2240f2df7f5314baa4
Parents: 692e2a0
Author: Siddharth Seth <sseth@apache.org>
Authored: Sun Jul 27 21:58:54 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Sun Jul 27 21:58:54 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/FrameworkClient.java  | 53 +++++++++++++
 .../java/org/apache/tez/client/TezClient.java   | 29 ++++----
 .../org/apache/tez/client/TezClientUtils.java   |  3 +-
 .../org/apache/tez/client/TezYarnClient.java    | 78 ++++++++++++++++++++
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 19 +++--
 .../org/apache/tez/client/TestTezClient.java    |  6 +-
 6 files changed, 158 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
new file mode 100644
index 0000000..2f97399
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public abstract class FrameworkClient {
+
+  public static FrameworkClient createFrameworkClient() {
+    return new TezYarnClient(YarnClient.createYarnClient());
+  }
+
+  public abstract void init(Configuration conf);
+
+  public abstract void start();
+
+  public abstract void stop();
+
+  public abstract void close() throws IOException;
+
+  public abstract YarnClientApplication createApplication() throws YarnException, IOException;
+
+  public abstract ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException;
+
+  public abstract void killApplication(ApplicationId appId) throws YarnException, IOException;
+
+  public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException,
IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index d984877..e36866c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.TezYARNUtils;
@@ -89,7 +88,7 @@ public class TezClient {
   private ApplicationId sessionAppId;
   private ApplicationId lastSubmittedAppId;
   private AMConfiguration amConfig;
-  private YarnClient yarnClient;
+  private FrameworkClient frameworkClient;
   private boolean isSession;
   private boolean sessionStarted = false;
   private boolean sessionStopped = false;
@@ -250,9 +249,9 @@ public class TezClient {
   public synchronized void start() throws TezException, IOException {
     amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration()));
 
-    yarnClient = createYarnClient();
-    yarnClient.init(amConfig.getYarnConfiguration());
-    yarnClient.start();    
+    frameworkClient = createFrameworkClient();
+    frameworkClient.init(amConfig.getYarnConfiguration());
+    frameworkClient.start();    
 
     if (isSession) {
       LOG.info("Session mode. Starting session.");
@@ -286,7 +285,7 @@ public class TezClient {
             TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
           appContext.setMaxAppAttempts(1);
         }  
-        yarnClient.submitApplication(appContext);
+        frameworkClient.submitApplication(appContext);
         sessionStarted = true;
       } catch (YarnException e) {
         throw new TezException(e);
@@ -422,15 +421,15 @@ public class TezClient {
               + ", sessionName=" + clientName
               + ", applicationId=" + sessionAppId);
           try {
-            yarnClient.killApplication(sessionAppId);
+            frameworkClient.killApplication(sessionAppId);
           } catch (YarnException e) {
             throw new TezException(e);
           }
         }
       }
     } finally {
-      if (yarnClient != null) {
-        yarnClient.close();
+      if (frameworkClient != null) {
+        frameworkClient.close();
       }
     }
   }
@@ -476,7 +475,7 @@ public class TezClient {
     }
     Preconditions.checkState(appId != null, "Cannot get status without starting an application");
     try {
-      ApplicationReport appReport = yarnClient.getApplicationReport(
+      ApplicationReport appReport = frameworkClient.getApplicationReport(
           appId);
       switch (appReport.getYarnApplicationState()) {
       case NEW:
@@ -600,15 +599,15 @@ public class TezClient {
   }
   
   // for testing
-  protected YarnClient createYarnClient() {
-    return YarnClient.createYarnClient();
+  protected FrameworkClient createFrameworkClient() {
+    return FrameworkClient.createFrameworkClient();
   }
   
   // for testing
   protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) 
       throws TezException, IOException {
     return TezClientUtils.getSessionAMProxy(
-        yarnClient, amConfig.getYarnConfiguration(), appId);
+        frameworkClient, amConfig.getYarnConfiguration(), appId);
   }
 
   private DAGClientAMProtocolBlockingPB waitForProxy()
@@ -672,7 +671,7 @@ public class TezClient {
           + ", applicationId=" + appId
           + ", dagName=" + dag.getName());
       
-      yarnClient.submitApplication(appContext);
+      frameworkClient.submitApplication(appContext);
       lastSubmittedAppId = appId;
     } catch (YarnException e) {
       throw new TezException(e);
@@ -682,7 +681,7 @@ public class TezClient {
 
   private ApplicationId createApplication() throws TezException, IOException {
     try {
-      return yarnClient.createApplication().
+      return frameworkClient.createApplication().
           getNewApplicationResponse().getApplicationId();
     } catch (YarnException e) {
       throw new TezException(e);

http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 93c5f34..d99d35d 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@@ -730,7 +729,7 @@ public class TezClientUtils {
     return textPath;
   }
 
-  static DAGClientAMProtocolBlockingPB getSessionAMProxy(YarnClient yarnClient,
+  static DAGClientAMProtocolBlockingPB getSessionAMProxy(FrameworkClient yarnClient,
       Configuration conf,
       ApplicationId applicationId) throws TezException, IOException {
     ApplicationReport appReport;

http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
new file mode 100644
index 0000000..3d8d24a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class TezYarnClient extends FrameworkClient {
+
+  private final YarnClient yarnClient;
+
+  protected TezYarnClient(YarnClient yarnClient) {
+    this.yarnClient = yarnClient;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    yarnClient.init(conf);
+  }
+
+  @Override
+  public void start() {
+    yarnClient.start();
+  }
+
+  @Override
+  public void stop() {
+    yarnClient.stop();
+  }
+
+  @Override
+  public final void close() throws IOException {
+    yarnClient.close();
+  }
+
+  @Override
+  public YarnClientApplication createApplication() throws YarnException, IOException {
+    return yarnClient.createApplication();
+  }
+
+  @Override
+  public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException {
+    return yarnClient.submitApplication(appSubmissionContext);
+  }
+
+  @Override
+  public void killApplication(ApplicationId appId) throws YarnException, IOException {
+    yarnClient.killApplication(appId);
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException,
IOException {
+    return yarnClient.getApplicationReport(appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 812dac9..5b63364 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -33,10 +33,9 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.FrameworkClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -68,7 +67,7 @@ public class DAGClientRPCImpl implements DAGClient {
   private final TezConfiguration conf;
   @VisibleForTesting
   ApplicationReport appReport;
-  private YarnClient yarnClient;
+  private FrameworkClient frameworkClient;
   @VisibleForTesting
   DAGClientAMProtocolBlockingPB proxy = null;
 
@@ -77,9 +76,9 @@ public class DAGClientRPCImpl implements DAGClient {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
-    yarnClient = new YarnClientImpl();
-    yarnClient.init(new YarnConfiguration(conf));
-    yarnClient.start();
+    frameworkClient = FrameworkClient.createFrameworkClient();
+    frameworkClient.init(new YarnConfiguration(conf));
+    frameworkClient.start();
     appReport = null;
   }
 
@@ -142,8 +141,8 @@ public class DAGClientRPCImpl implements DAGClient {
     if (this.proxy != null) {
       RPC.stopProxy(this.proxy);
     }
-    if(yarnClient != null) {
-      yarnClient.stop();
+    if(frameworkClient != null) {
+      frameworkClient.stop();
     }
   }
 
@@ -190,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient {
     }
     ApplicationReport appReport;
     try {
-      appReport = yarnClient.getApplicationReport(appId);
+      appReport = frameworkClient.getApplicationReport(appId);
     } catch (YarnException e) {
       throw new TezException(e);
     }
@@ -279,7 +278,7 @@ public class DAGClientRPCImpl implements DAGClient {
 
   ApplicationReport getAppReport() throws IOException, TezException {
     try {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
       if (LOG.isDebugEnabled()) {
         LOG.debug("App: " + appId + " in state: "
             + appReport.getYarnApplicationState());

http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 9039bf4..0454fda 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -61,7 +61,7 @@ import com.google.protobuf.RpcController;
 public class TestTezClient {
 
   class TezClientForTest extends TezClient {
-    YarnClient mockYarnClient;
+    TezYarnClient mockYarnClient;
     DAGClientAMProtocolBlockingPB sessionAmProxy;
 
     public TezClientForTest(String name, TezConfiguration tezConf,
@@ -71,7 +71,7 @@ public class TestTezClient {
     }
     
     @Override
-    protected YarnClient createYarnClient() {
+    protected FrameworkClient createFrameworkClient() {
       return mockYarnClient;
     }
     
@@ -112,7 +112,7 @@ public class TestTezClient {
     DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class,
RETURNS_DEEP_STUBS);
     
     client.sessionAmProxy = sessionAmProxy;
-    client.mockYarnClient = yarnClient;
+    client.mockYarnClient = new TezYarnClient(yarnClient);
     
     client.start();
     verify(yarnClient, times(1)).init((Configuration)any());


Mime
View raw message