phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject git commit: PHOENIX-901 Ensure ConnectionQueryServices only initialized once (JamesTaylor)
Date Fri, 28 Mar 2014 00:44:15 GMT
Repository: incubator-phoenix
Updated Branches:
  refs/heads/3.0 43bb7c478 -> efe25292f


PHOENIX-901 Ensure ConnectionQueryServices only initialized once (JamesTaylor)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/efe25292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/efe25292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/efe25292

Branch: refs/heads/3.0
Commit: efe25292f48dd1a925b26c6e32b74b1635bc8d38
Parents: 43bb7c4
Author: James Taylor <jamestaylor@apache.org>
Authored: Thu Mar 27 17:44:07 2014 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Thu Mar 27 17:44:07 2014 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  |  34 +++-
 .../query/ConnectionQueryServicesImpl.java      | 172 +++++++++++--------
 .../query/ConnectionlessQueryServicesImpl.java  |  72 +++++---
 3 files changed, 181 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/efe25292/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 44d54fb..823f04b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -111,20 +111,46 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
         checkClosed();
 
         ConnectionInfo connInfo = ConnectionInfo.create(url);
-        ConnectionInfo normalizedConnInfo = connInfo.normalize(getQueryServices().getProps());
+        QueryServices services = getQueryServices();
+        ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
         ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
         if (connectionQueryServices == null) {
             if (normalizedConnInfo.isConnectionless()) {
-                connectionQueryServices = new ConnectionlessQueryServicesImpl(getQueryServices());
+                connectionQueryServices = new ConnectionlessQueryServicesImpl(services);
             } else {
-                connectionQueryServices = new ConnectionQueryServicesImpl(getQueryServices(),
normalizedConnInfo);
+                connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo);
             }
-            connectionQueryServices.init(url, info);
             ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo,
connectionQueryServices);
             if (prevValue != null) {
                 connectionQueryServices = prevValue;
             }
         }
+        boolean success = false;
+        SQLException sqlE = null;
+        try {
+            connectionQueryServices.init(url, info);
+            success = true;
+        } catch (SQLException e) {
+            sqlE = e;
+        }
+        finally {
+            if (!success) {
+                try {
+                    connectionQueryServices.close();
+                } catch (SQLException e) {
+                    if (sqlE == null) {
+                        sqlE = e;
+                    } else {
+                        sqlE.setNextException(e);
+                    }
+                }
+                // Remove from map, as initialization failed
+                connectionQueryServicesMap.remove(connectionQueryServices);
+                if (sqlE != null) {
+                    throw sqlE;
+                }
+            }
+        }
         return connectionQueryServices;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/efe25292/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index e630c74..74b629c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -171,9 +171,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
     // Copy of config.getProps(), but read-only to prevent synchronization that we
     // don't need.
     private final ReadOnlyProps props;
-    private final HConnection connection;
-    private final StatsManager statsManager;
     private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>
childServices;
+    private final StatsManager statsManager;
     // Cache the latest meta data here for future connections
     private volatile PMetaData latestMetaData;
     private final Object latestMetaDataLock = new Object();
@@ -182,7 +181,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
     private boolean hasInvalidIndexConfiguration = false;
     private int connectionCount = 0;
     private WhiteList upgradeWhiteList = null;
-    
+
+    private HConnection connection;
+    private volatile boolean initialized;
+    private volatile boolean closed;
+    private volatile SQLException initializationException;
     private ConcurrentMap<SequenceKey,Sequence> sequenceMap = Maps.newConcurrentMap();
     private KeyValueBuilder kvBuilder;
 
@@ -198,7 +201,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
      * @param connectionInfo to provide connection information
      * @throws SQLException
      */
-    public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo)
throws SQLException {
+    public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo)
{
         super(services);
         Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
         for (Entry<String,String> entry : services.getProps()) {
@@ -211,15 +214,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
         // on the server side during testing.
         this.config = HBaseConfiguration.create(config);
         this.props = new ReadOnlyProps(this.config.iterator());
-        try {
-            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
-        } catch (ZooKeeperConnectionException e) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
-                .setRootCause(e).build().buildException();
-        }
-        if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
-        }
         this.latestMetaData = newEmptyMetaData();
         // TODO: should we track connection wide memory usage or just org-wide usage?
         // If connection-wide, create a MemoryManager here, otherwise just use the one from
the delegate
@@ -232,6 +226,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
         String hbaseVersion = VersionInfo.getVersion();
         this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
     }
+    
+    private void openConnection() throws SQLException {
+        try {
+            this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+        } catch (ZooKeeperConnectionException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                .setRootCause(e).build().buildException();
+        }
+        if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
+        }
+    }
 
     @Override
     public StatsManager getStatsManager() {
@@ -277,45 +283,54 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
      */
     @Override
     public void close() throws SQLException {
-        SQLException sqlE = null;
-        try {
-            // Attempt to return any unused sequences.
-            returnAllSequences(this.sequenceMap);
-        } catch (SQLException e) {
-            sqlE = e;
-        } finally {
+        if (closed) {
+            return;
+        }
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            SQLException sqlE = null;
             try {
-                // Clear any client-side caches.  
-                statsManager.clearStats();
+                // Attempt to return any unused sequences.
+                if (connection != null) returnAllSequences(this.sequenceMap);
             } catch (SQLException e) {
-                if (sqlE == null) {
-                    sqlE = e;
-                } else {
-                    sqlE.setNextException(e);
-                }
+                sqlE = e;
             } finally {
                 try {
-                    childServices.clear();
-                    latestMetaData = null;
-                    connection.close();
-                } catch (IOException e) {
+                    // Clear any client-side caches.  
+                    statsManager.clearStats();
+                } catch (SQLException e) {
                     if (sqlE == null) {
-                        sqlE = ServerUtil.parseServerException(e);
+                        sqlE = e;
                     } else {
-                        sqlE.setNextException(ServerUtil.parseServerException(e));
+                        sqlE.setNextException(e);
                     }
                 } finally {
                     try {
-                        super.close();
-                    } catch (SQLException e) {
+                        childServices.clear();
+                        latestMetaData = null;
+                        if (connection != null) connection.close();
+                    } catch (IOException e) {
                         if (sqlE == null) {
-                            sqlE = e;
+                            sqlE = ServerUtil.parseServerException(e);
                         } else {
-                            sqlE.setNextException(e);
+                            sqlE.setNextException(ServerUtil.parseServerException(e));
                         }
                     } finally {
-                        if (sqlE != null) {
-                            throw sqlE;
+                        try {
+                            super.close();
+                        } catch (SQLException e) {
+                            if (sqlE == null) {
+                                sqlE = e;
+                            } else {
+                                sqlE.setNextException(e);
+                            }
+                        } finally {
+                            if (sqlE != null) {
+                                throw sqlE;
+                            }
                         }
                     }
                 }
@@ -1306,41 +1321,65 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
     
     @Override
     public void init(String url, Properties props) throws SQLException {
-        Properties scnProps = PropertiesUtil.deepCopy(props);
-        scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
-        scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-        PhoenixConnection metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
-        SQLException sqlE = null;
-        try {
-            try {
-                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
-            } catch (NewerTableAlreadyExistsException ignore) {
-                // Ignore, as this will happen if the SYSTEM.TABLE already exists at this
fixed timestamp.
-                // A TableAlreadyExistsException is not thrown, since the table only exists
*after* this fixed timestamp.
+        if (initialized) {
+            if (initializationException != null) {
+                // Throw previous initialization exception, as we won't resuse this instance
+                throw initializationException;
             }
-            try {
-                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
-            } catch (NewerTableAlreadyExistsException ignore) {
-                // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this
fixed timestamp.
-                // A TableAlreadyExistsException is not thrown, since the table only exists
*after* this fixed timestamp.
+            return;
+        }
+        synchronized (this) {
+            if (initialized) {
+                if (initializationException != null) {
+                    // Throw previous initialization exception, as we won't resuse this instance
+                    throw initializationException;
+                }
+                return;
             }
-        } catch (SQLException e) {
-            sqlE = e;
-        } finally {
+            if (closed) {
+                throw new SQLException("The connection to the cluster has been closed.");
+            }
+            initialized = true;
+                
+            SQLException sqlE = null;
+            PhoenixConnection metaConnection = null;
             try {
-                metaConnection.close();
+                openConnection();
+                Properties scnProps = PropertiesUtil.deepCopy(props);
+                scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+                scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
+                try {
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.TABLE already exists at
this fixed timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only
exists *after* this fixed timestamp.
+                }
+                try {
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists
at this fixed timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only
exists *after* this fixed timestamp.
+                }
+                upgradeMetaDataTo3_0(url, props);
             } catch (SQLException e) {
+                sqlE = e;
+            } finally {
+                try {
+                    if (metaConnection != null) metaConnection.close();
+                } catch (SQLException e) {
+                    if (sqlE != null) {
+                        sqlE.setNextException(e);
+                    } else {
+                        sqlE = e;
+                    }
+                }
                 if (sqlE != null) {
-                    sqlE.setNextException(e);
-                } else {
-                    sqlE = e;
+                    initializationException = sqlE;
+                    throw sqlE;
                 }
             }
-            if (sqlE != null) {
-                throw sqlE;
-            }
         }
-        upgradeMetaDataTo3_0(url, props);
     }
 
     @Override
@@ -2136,5 +2175,4 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             }
         }
     }
-        
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/efe25292/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 457eeb6..ced4ba4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -84,6 +84,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
     private PMetaData metaData;
     private final Map<SequenceKey, Long> sequenceMap = Maps.newHashMap();
     private KeyValueBuilder kvBuilder;
+    private volatile boolean initialized;
+    private volatile SQLException initializationException;
     
     public ConnectionlessQueryServicesImpl(QueryServices queryServices) {
         super(queryServices);
@@ -200,41 +202,59 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
     }
 
+    // TODO: share this with ConnectionQueryServicesImpl
     @Override
     public void init(String url, Properties props) throws SQLException {
-        Properties scnProps = PropertiesUtil.deepCopy(props);
-        scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
-        scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-        PhoenixConnection metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
-        SQLException sqlE = null;
-        try {
-            try {
-                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
-            } catch (TableAlreadyExistsException ignore) {
-                // Ignore, as this will happen if the SYSTEM.TABLE already exists at this
fixed timestamp.
-                // A TableAlreadyExistsException is not thrown, since the table only exists
*after* this fixed timestamp.
+        if (initialized) {
+            if (initializationException != null) {
+                throw initializationException;
             }
-            try {
-                metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
-            } catch (NewerTableAlreadyExistsException ignore) {
-                // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this
fixed timestamp.
-                // A TableAlreadyExistsException is not thrown, since the table only exists
*after* this fixed timestamp.
+            return;
+        }
+        synchronized (this) {
+            if (initialized) {
+                if (initializationException != null) {
+                    throw initializationException;
+                }
+                return;
             }
-        } catch (SQLException e) {
-            sqlE = e;
-        } finally {
+            initialized = true;
+            SQLException sqlE = null;
+            PhoenixConnection metaConnection = null;
             try {
-                metaConnection.close();
+                Properties scnProps = PropertiesUtil.deepCopy(props);
+                scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+                scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
+                try {
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+                } catch (TableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.TABLE already exists at
this fixed timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only
exists *after* this fixed timestamp.
+                }
+                try {
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists
at this fixed timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only
exists *after* this fixed timestamp.
+                }
             } catch (SQLException e) {
+                sqlE = e;
+            } finally {
+                try {
+                    if (metaConnection != null) metaConnection.close();
+                } catch (SQLException e) {
+                    if (sqlE != null) {
+                        sqlE.setNextException(e);
+                    } else {
+                        sqlE = e;
+                    }
+                }
                 if (sqlE != null) {
-                    sqlE.setNextException(e);
-                } else {
-                    sqlE = e;
+                    initializationException = sqlE;
+                    throw sqlE;
                 }
             }
-            if (sqlE != null) {
-                throw sqlE;
-            }
         }
     }
 


Mime
View raw message