hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jd...@apache.org
Subject hive git commit: HIVE-13759: LlapTaskUmbilicalExternalClient should be closed by the record reader (Jason Dere, reviewed by Siddharth Seth)
Date Wed, 15 Jun 2016 17:57:38 GMT
Repository: hive
Updated Branches:
  refs/heads/master d43938ca1 -> dc4c66f6b


HIVE-13759: LlapTaskUmbilicalExternalClient should be closed by the record reader (Jason Dere,
reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dc4c66f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dc4c66f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dc4c66f6

Branch: refs/heads/master
Commit: dc4c66f6babf0d52344d52e9af37cdfa2ed42be0
Parents: d43938c
Author: Jason Dere <jdere@hortonworks.com>
Authored: Wed Jun 15 10:57:17 2016 -0700
Committer: Jason Dere <jdere@hortonworks.com>
Committed: Wed Jun 15 10:57:17 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 26 ++++++++++++++++++--
 .../ext/LlapTaskUmbilicalExternalClient.java    |  3 ++-
 .../hadoop/hive/llap/LlapBaseInputFormat.java   | 25 +++++--------------
 .../hadoop/hive/llap/LlapRowInputFormat.java    |  4 +--
 .../hadoop/hive/llap/TestLlapOutputFormat.java  |  2 +-
 5 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 3c858a8..f2700c8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.llap;
 
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -52,14 +53,16 @@ public class LlapBaseRecordReader<V extends WritableComparable>
implements Recor
   protected Thread readerThread = null;
   protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
   protected final long timeout;
+  protected final Closeable client;
 
-  public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf
job) {
+  public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf
job, Closeable client) {
     din = new DataInputStream(in);
     this.schema = schema;
     this.clazz = clazz;
     this.readerThread = Thread.currentThread();
     this.timeout = 3 * HiveConf.getTimeVar(job,
         HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    this.client = client;
   }
 
   public Schema getSchema() {
@@ -68,7 +71,26 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements
Recor
 
   @Override
   public void close() throws IOException {
-    din.close();
+    Exception caughtException = null;
+    try {
+      din.close();
+    } catch (Exception err) {
+      LOG.error("Error closing input stream:" + err.getMessage(), err);
+      caughtException = err;
+    }
+
+    if (client != null) {
+      try {
+        client.close();
+      } catch (Exception err) {
+        LOG.error("Error closing client:" + err.getMessage(), err);
+        caughtException = (caughtException == null ? err : caughtException);
+      }
+    }
+
+    if (caughtException != null) {
+      throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 85943d2..5f250b4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.hive.llap.ext;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -58,7 +59,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class LlapTaskUmbilicalExternalClient extends AbstractService {
+public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable
{
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 4d17080..46030ec 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -99,9 +99,6 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
   public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
 
-  private Connection con;
-  private Statement stmt;
-
   public LlapBaseInputFormat(String url, String user, String pwd, String query) {
     this.url = url;
     this.user = user;
@@ -172,7 +169,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
     @SuppressWarnings("rawtypes")
     LlapBaseRecordReader recordReader = new LlapBaseRecordReader(
-        socket.getInputStream(), llapSplit.getSchema(), Text.class, job);
+        socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient);
     umbilicalResponder.setRecordReader(recordReader);
     return recordReader;
   }
@@ -196,11 +193,12 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
       throw new IOException(e);
     }
 
-    try {
-      con = DriverManager.getConnection(url,user,pwd);
-      stmt = con.createStatement();
-      String sql = String.format(SPLIT_QUERY, query, numSplits);
+    String sql = String.format(SPLIT_QUERY, query, numSplits);
+    try (
+      Connection con = DriverManager.getConnection(url,user,pwd);
+      Statement stmt = con.createStatement();
       ResultSet res = stmt.executeQuery(sql);
+    ) {
       while (res.next()) {
         // deserialize split
         DataInput in = new DataInputStream(res.getBinaryStream(1));
@@ -208,23 +206,12 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
         is.readFields(in);
         ins.add(is);
       }
-
-      res.close();
-      stmt.close();
     } catch (Exception e) {
       throw new IOException(e);
     }
     return ins.toArray(new InputSplit[ins.size()]);
   }
 
-  public void close() {
-    try {
-      con.close();
-    } catch (Exception e) {
-      // ignore
-    }
-  }
-
   private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws
IOException {
     LlapRegistryService registryService = LlapRegistryService.getClient(job);
     String host = llapSplit.getLocations()[0];

http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
index 7efc711..c3001e9 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
-
 public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
-  LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
+
+  private LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 577037c..2288cd4 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -103,7 +103,7 @@ public class TestLlapOutputFormat {
       writer.close(null);
 
       InputStream in = socket.getInputStream();
-      LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job);
+      LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null);
 
       LOG.debug("Have record reader");
 


Mime
View raw message