phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [5/6] phoenix git commit: PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection
Date Sun, 08 Jan 2017 08:05:09 GMT
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded
in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/87cf1143
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/87cf1143
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/87cf1143

Branch: refs/heads/4.9-HBase-1.1
Commit: 87cf11434823c4fe325f340f6d19f7604bbf27a6
Parents: 397fff9
Author: Andrew Purtell <apurtell@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Sat Jan 7 18:53:38 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/87cf1143/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index eb26230..30d3a1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -254,6 +254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -370,15 +371,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString,
timeOut, null, ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500,
2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
@@ -389,11 +391,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we succeeded in getting
a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -463,14 +466,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one to support transactions
+                        if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);


Mime
View raw message