accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed
Date Thu, 19 Dec 2013 21:27:15 GMT
Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 6d083e44d -> 674fa95ca


ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/674fa95c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/674fa95c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/674fa95c

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 674fa95cacaa9353142071a66006e0ffb65cae94
Parents: 6d083e4
Author: Eric Newton <eric.newton@gmail.com>
Authored: Thu Dec 19 16:27:16 2013 -0500
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Thu Dec 19 16:27:16 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AbstractInputFormat.java   | 254 ++++++++++---------
 .../client/mapreduce/AccumuloOutputFormat.java  |   5 +-
 .../mapreduce/lib/util/InputConfigurator.java   |  12 +-
 3 files changed, 143 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 2b7e958..9f30563 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -525,144 +525,148 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
     Level logLevel = getLogLevel(context);
     log.setLevel(logLevel);
     validateOptions(context);
-
+    
     LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
     Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
-    for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet())
{
-
-      String tableName = tableConfigEntry.getKey();
-      InputTableConfig tableConfig = tableConfigEntry.getValue();
-      
-      Instance instance = getInstance(context);
-      boolean mockInstance;
-      String tableId;
-      // resolve table name to id once, and use id from this point forward
-      if (instance instanceof MockInstance) {
-        tableId = "";
-        mockInstance = true;
-      } else {
-        try {
-          tableId = Tables.getTableId(instance, tableName);
-        } catch (TableNotFoundException e) {
-          throw new IOException(e);
+    Instance instance = getInstance(context);
+    try {
+      for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet())
{
+        
+        String tableName = tableConfigEntry.getKey();
+        InputTableConfig tableConfig = tableConfigEntry.getValue();
+        
+        boolean mockInstance;
+        String tableId;
+        // resolve table name to id once, and use id from this point forward
+        if (instance instanceof MockInstance) {
+          tableId = "";
+          mockInstance = true;
+        } else {
+          try {
+            tableId = Tables.getTableId(instance, tableName);
+          } catch (TableNotFoundException e) {
+            throw new IOException(e);
+          }
+          mockInstance = false;
         }
-        mockInstance = false;
-      }
-      
-      Authorizations auths = getScanAuthorizations(context);
-      String principal = getPrincipal(context);
-      AuthenticationToken token = getAuthenticationToken(context);
-
-      boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
-      List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
: tableConfig.getRanges();
-      if (ranges.isEmpty()) {
-        ranges = new ArrayList<Range>(1);
-        ranges.add(new Range());
-      }
-
-      // get the metadata information for these ranges
-      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-      TabletLocator tl;
-      try {
-        if (tableConfig.isOfflineScan()) {
-          binnedRanges = binOfflineTable(context, tableId, ranges);
-          while (binnedRanges == null) {
-            // Some tablets were still online, try again
-            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
+        
+        Authorizations auths = getScanAuthorizations(context);
+        String principal = getPrincipal(context);
+        AuthenticationToken token = getAuthenticationToken(context);
+        
+        boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
+        List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges())
: tableConfig.getRanges();
+        if (ranges.isEmpty()) {
+          ranges = new ArrayList<Range>(1);
+          ranges.add(new Range());
+        }
+        
+        // get the metadata information for these ranges
+        Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+        TabletLocator tl;
+        try {
+          if (tableConfig.isOfflineScan()) {
             binnedRanges = binOfflineTable(context, tableId, ranges);
-
-          }
-        } else {
-          tl = getTabletLocator(context, tableId);
-          // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
-          tl.invalidateCache();
-          Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
-
-          while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
-            if (!(instance instanceof MockInstance)) {
-              if (!Tables.exists(instance, tableId))
-                throw new TableDeletedException(tableId);
-              if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-                throw new TableOfflineException(instance, tableId);
+            while (binnedRanges == null) {
+              // Some tablets were still online, try again
+              UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly
between 100 and 200 ms
+              binnedRanges = binOfflineTable(context, tableId, ranges);
+              
             }
-            binnedRanges.clear();
-            log.warn("Unable to locate bins for specified ranges. Retrying.");
-            UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
+          } else {
+            tl = getTabletLocator(context, tableId);
+            // its possible that the cache could contain complete, but old information about
a tables tablets... so clear it
             tl.invalidateCache();
+            Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context));
+            
+            while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) {
+              if (!(instance instanceof MockInstance)) {
+                if (!Tables.exists(instance, tableId))
+                  throw new TableDeletedException(tableId);
+                if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+                  throw new TableOfflineException(instance, tableId);
+              }
+              binnedRanges.clear();
+              log.warn("Unable to locate bins for specified ranges. Retrying.");
+              UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly
between 100 and 200 ms
+              tl.invalidateCache();
+            }
           }
+        } catch (Exception e) {
+          throw new IOException(e);
         }
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-
-      HashMap<Range,ArrayList<String>> splitsToAdd = null;
-
-      if (!autoAdjust)
-        splitsToAdd = new HashMap<Range,ArrayList<String>>();
-
-      HashMap<String,String> hostNameCache = new HashMap<String,String>();
-      for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet())
{
-        String ip = tserverBin.getKey().split(":", 2)[0];
-        String location = hostNameCache.get(ip);
-        if (location == null) {
-          InetAddress inetAddress = InetAddress.getByName(ip);
-          location = inetAddress.getHostName();
-          hostNameCache.put(ip, location);
-        }
-        for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet())
{
-          Range ke = extentRanges.getKey().toDataRange();
-          for (Range r : extentRanges.getValue()) {
-            if (autoAdjust) {
-              // divide ranges into smaller ranges, based on the tablets
-              RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r),
new String[] {location});
-              
-              split.setOffline(tableConfig.isOfflineScan());
-              split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-              split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-              split.setMockInstance(mockInstance);
-              split.setFetchedColumns(tableConfig.getFetchedColumns());
-              split.setPrincipal(principal);
-              split.setToken(token);
-              split.setInstanceName(instance.getInstanceName());
-              split.setZooKeepers(instance.getZooKeepers());
-              split.setAuths(auths);
-              split.setIterators(tableConfig.getIterators());
-              split.setLogLevel(logLevel);
-              
-              splits.add(split);
-            } else {
-              // don't divide ranges
-              ArrayList<String> locations = splitsToAdd.get(r);
-              if (locations == null)
-                locations = new ArrayList<String>(1);
-              locations.add(location);
-              splitsToAdd.put(r, locations);
+        
+        HashMap<Range,ArrayList<String>> splitsToAdd = null;
+        
+        if (!autoAdjust)
+          splitsToAdd = new HashMap<Range,ArrayList<String>>();
+        
+        HashMap<String,String> hostNameCache = new HashMap<String,String>();
+        for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin :
binnedRanges.entrySet()) {
+          String ip = tserverBin.getKey().split(":", 2)[0];
+          String location = hostNameCache.get(ip);
+          if (location == null) {
+            InetAddress inetAddress = InetAddress.getByName(ip);
+            location = inetAddress.getHostName();
+            hostNameCache.put(ip, location);
+          }
+          for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet())
{
+            Range ke = extentRanges.getKey().toDataRange();
+            for (Range r : extentRanges.getValue()) {
+              if (autoAdjust) {
+                // divide ranges into smaller ranges, based on the tablets
+                RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r),
new String[] {location});
+                
+                split.setOffline(tableConfig.isOfflineScan());
+                split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+                split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+                split.setMockInstance(mockInstance);
+                split.setFetchedColumns(tableConfig.getFetchedColumns());
+                split.setPrincipal(principal);
+                split.setToken(token);
+                split.setInstanceName(instance.getInstanceName());
+                split.setZooKeepers(instance.getZooKeepers());
+                split.setAuths(auths);
+                split.setIterators(tableConfig.getIterators());
+                split.setLogLevel(logLevel);
+                
+                splits.add(split);
+              } else {
+                // don't divide ranges
+                ArrayList<String> locations = splitsToAdd.get(r);
+                if (locations == null)
+                  locations = new ArrayList<String>(1);
+                locations.add(location);
+                splitsToAdd.put(r, locations);
+              }
             }
           }
         }
+        
+        if (!autoAdjust)
+          for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
{
+            RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(),
entry.getValue().toArray(new String[0]));
+            
+            split.setOffline(tableConfig.isOfflineScan());
+            split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
+            split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
+            split.setMockInstance(mockInstance);
+            split.setFetchedColumns(tableConfig.getFetchedColumns());
+            split.setPrincipal(principal);
+            split.setToken(token);
+            split.setInstanceName(instance.getInstanceName());
+            split.setZooKeepers(instance.getZooKeepers());
+            split.setAuths(auths);
+            split.setIterators(tableConfig.getIterators());
+            split.setLogLevel(logLevel);
+            
+            splits.add(split);
+          }
       }
-
-      if (!autoAdjust)
-        for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
{
-          RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(),
entry.getValue().toArray(new String[0]));
-
-          split.setOffline(tableConfig.isOfflineScan());
-          split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
-          split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
-          split.setMockInstance(mockInstance);
-          split.setFetchedColumns(tableConfig.getFetchedColumns());
-          split.setPrincipal(principal);
-          split.setToken(token);
-          split.setInstanceName(instance.getInstanceName());
-          split.setZooKeepers(instance.getZooKeepers());
-          split.setAuths(auths);
-          split.setIterators(tableConfig.getIterators());
-          split.setLogLevel(logLevel);
-          
-          splits.add(split);
-        }
+      return splits;
+    } finally {
+      instance.close();
     }
-    return splits;
   }
 
   // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop
2 compatibility

http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 0c924b1..b816d43 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -533,17 +533,20 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   public void checkOutputSpecs(JobContext job) throws IOException {
     if (!isConnectorInfoSet(job))
       throw new IOException("Connector info has not been set.");
+    Instance instance = getInstance(job);
     try {
       // if the instance isn't configured, it will complain here
       String principal = getPrincipal(job);
       AuthenticationToken token = getAuthenticationToken(job);
-      Connector c = getInstance(job).getConnector(principal, token);
+      Connector c = instance.getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {
       throw new IOException(e);
+    } finally {
+      instance.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 7b17d11..7419d9b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -601,7 +601,11 @@ public class InputConfigurator extends ConfiguratorBase {
     if ("MockInstance".equals(instanceType))
       return new MockTabletLocator();
     Instance instance = getInstance(implementingClass, conf);
-    return TabletLocator.getLocator(instance, new Text(tableId));
+    try {
+      return TabletLocator.getLocator(instance, new Text(tableId));
+    } finally {
+      instance.close();
+    }
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext
job)
@@ -625,10 +629,12 @@ public class InputConfigurator extends ConfiguratorBase {
     if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey))
       throw new IOException("Instance info has not been set.");
     // validate that we can connect as configured
+    Instance inst = getInstance(implementingClass, conf);
     try {
       String principal = getPrincipal(implementingClass, conf);
       AuthenticationToken token = getAuthenticationToken(implementingClass, conf);
-      Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
+
+      Connector c = inst.getConnector(principal, token);
       if (!c.securityOperations().authenticateUser(principal, token))
         throw new IOException("Unable to authenticate user");
 
@@ -656,6 +662,8 @@ public class InputConfigurator extends ConfiguratorBase {
       throw new IOException(e);
     } catch (TableNotFoundException e) {
       throw new IOException(e);
+    } finally {
+      inst.close();
     }
   }
 


Mime
View raw message