Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AD8B9200C6E for ; Mon, 8 May 2017 17:12:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AC46F160BA5; Mon, 8 May 2017 15:12:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7C413160B99 for ; Mon, 8 May 2017 17:12:05 +0200 (CEST) Received: (qmail 55703 invoked by uid 500); 8 May 2017 15:12:04 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 55692 invoked by uid 99); 8 May 2017 15:12:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 May 2017 15:12:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A67EDFBC8; Mon, 8 May 2017 15:12:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jcamacho@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-16588: Ressource leak by druid http client (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez) Date: Mon, 8 May 2017 15:12:04 +0000 (UTC) archived-at: Mon, 08 May 2017 15:12:06 -0000 Repository: hive Updated Branches: refs/heads/master 301e7c5ea -> 57beac4ef HIVE-16588: Ressource leak by druid http client (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57beac4e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57beac4e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57beac4e Branch: refs/heads/master Commit: 57beac4efe69796e4f7a5ea8e5ff67819f55a6a3 Parents: 301e7c5 Author: Slim Bouguerra Authored: Mon May 8 16:08:26 2017 +0100 Committer: Jesus Camacho Rodriguez Committed: Mon May 8 16:08:53 2017 +0100 ---------------------------------------------------------------------- .../hadoop/hive/druid/DruidStorageHandler.java | 43 ++++++++------ .../druid/io/DruidQueryBasedInputFormat.java | 60 ++------------------ .../druid/serde/DruidQueryRecordReader.java | 20 +------ .../hadoop/hive/druid/serde/DruidSerDe.java | 18 +----- .../hive/druid/TestDruidStorageHandler.java | 19 ++----- 5 files changed, 41 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index daee2fe..4510db3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.common.util.ShutdownHookManager; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -91,13 +92,23 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + private static final HttpClient HTTP_CLIENT; + static { + final Lifecycle lifecycle = new Lifecycle(); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Issues with lifecycle start", e); + } + HTTP_CLIENT = makeHttpClient(lifecycle); + ShutdownHookManager.addShutdownHook(()-> lifecycle.stop()); + } + private final SQLMetadataConnector connector; private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; - private HttpClient httpClient; - private String uniqueId = null; private String rootWorkingDir = null; @@ -151,12 +162,10 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor @VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector, - MetadataStorageTablesConfig druidMetadataStorageTablesConfig, - HttpClient httpClient + MetadataStorageTablesConfig druidMetadataStorageTablesConfig ) { this.connector = connector; this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig; - this.httpClient = httpClient; } @Override @@ -280,19 +289,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); - // check if the coordinator is up - httpClient = makeHttpClient(lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - Throwables.propagate(e); - } String coordinatorResponse = null; try { coordinatorResponse = RetryUtils.retry(new Callable() { @Override public String call() throws Exception { - return DruidStorageHandlerUtils.getURL(httpClient, + return DruidStorageHandlerUtils.getURL(getHttpClient(), new URL(String.format("http://%s/status", coordinatorAddress)) ); } @@ -347,7 +349,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor @Override public boolean apply(URL input) { try { - String result = DruidStorageHandlerUtils.getURL(httpClient, input); + String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result)); return Strings.isNullOrEmpty(result); } catch (IOException e) { @@ -586,15 +588,18 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor return rootWorkingDir; } - private HttpClient makeHttpClient(Lifecycle lifecycle) { + private static HttpClient makeHttpClient(Lifecycle lifecycle) { final int numConnection = HiveConf - .getIntVar(getConf(), + .getIntVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION ); final Period readTimeout = new Period( - HiveConf.getVar(getConf(), + HiveConf.getVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT )); + LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout", + numConnection, readTimeout.toStandardDuration().getMillis() + ); return HttpClientInit.createClient( HttpClientConfig.builder().withNumConnections(numConnection) @@ -602,4 +607,8 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor lifecycle ); } + + public static HttpClient getHttpClient() { + return HTTP_CLIENT; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 53624e1..2f53616 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; @@ -193,23 +194,6 @@ public class DruidQueryBasedInputFormat extends InputFormat, R extends C LOG.info("Retrieving from druid using query:\n " + query); } - final Lifecycle lifecycle = new Lifecycle(); - final int numConnection = HiveConf - .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration()) - .withNumConnections(numConnection).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Issues with lifecycle start", e); - } InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -111,8 +97,6 @@ public abstract class DruidQueryRecordReader, R extends C } catch (IOException e) { response.close(); throw e; - } finally { - lifecycle.stop(); } if (resultsList == null || resultsList.isEmpty()) { return; http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index bbe29b6..656c0f1 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; @@ -100,20 +101,12 @@ public class DruidSerDe extends AbstractSerDe { protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class); - private int numConnection; - private Period readTimeout; - private String[] columns; private PrimitiveTypeInfo[] types; private ObjectInspector inspector; @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { - // Init connection properties - numConnection = HiveConf - .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - readTimeout = new Period( - HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); final List columnNames = new ArrayList<>(); final List columnTypes = new ArrayList<>(); @@ -256,20 +249,13 @@ public class DruidSerDe extends AbstractSerDe { /* Submits the request and returns */ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) throws SerDeException, IOException { - final Lifecycle lifecycle = new Lifecycle(); - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); InputStream response; try { - lifecycle.start(); - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } // Retrieve results http://git-wip-us.apache.org/repos/asf/hive/blob/57beac4e/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 05e3ec5..1fe155a 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import io.druid.indexer.JobHelper; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageTablesConfig; -import io.druid.metadata.SQLMetadataSegmentManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -94,8 +93,7 @@ public class TestDruidStorageHandler { public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { @@ -122,8 +120,7 @@ public class TestDruidStorageHandler { ); DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); } @@ -133,8 +130,7 @@ public class TestDruidStorageHandler { throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -164,8 +160,7 @@ public class TestDruidStorageHandler { public void testCommitInsertTable() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -189,8 +184,7 @@ public class TestDruidStorageHandler { public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); @@ -234,8 +228,7 @@ public class TestDruidStorageHandler { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( connector, - metadataStorageTablesConfig, - null + metadataStorageTablesConfig ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration();