accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1424053 - in /accumulo/branches/1.4: ./ src/ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/ src/core/src/main/java/org/apache/accumulo/core/conf/ src/core/src/main/java/org/apache/accumulo/core/util/ src/core/src/ma...
Date Wed, 19 Dec 2012 19:31:21 GMT
Author: ecn
Date: Wed Dec 19 19:31:20 2012
New Revision: 1424053

URL: http://svn.apache.org/viewvc?rev=1424053&view=rev
Log:
ACCUMULO-408 use sync to make sure we do not read old data from zookeeper

Modified:
    accumulo/branches/1.4/   (props changed)
    accumulo/branches/1.4/src/   (props changed)
    accumulo/branches/1.4/src/core/   (props changed)
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
    accumulo/branches/1.4/src/server/   (props changed)
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java

Propchange: accumulo/branches/1.4/
------------------------------------------------------------------------------
  Merged /accumulo/trunk:r1423994,1424036,1424050

Propchange: accumulo/branches/1.4/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1423994,1424036,1424050
  Merged /accumulo/trunk:r1423994,1424036

Propchange: accumulo/branches/1.4/src/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src/core:r1423994,1424036,1424050
  Merged /accumulo/trunk/core:r1423994,1424036

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
Wed Dec 19 19:31:20 2012
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
 import org.apache.accumulo.core.client.impl.thrift.ClientService.Iface;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.util.ArgumentChecker;
@@ -124,6 +125,11 @@ public class ServerClient {
   }
   
   public static Pair<String,ClientService.Iface> getConnection(Instance instance, boolean
preferCachedConnections) throws TTransportException {
+    AccumuloConfiguration conf = instance.getConfiguration();
+    return getConnection(instance, preferCachedConnections, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+  }
+  
+  public static Pair<String,ClientService.Iface> getConnection(Instance instance, boolean
preferCachedConnections, long rpcTimeout) throws TTransportException {
     ArgumentChecker.notNull(instance);
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
@@ -131,13 +137,15 @@ public class ServerClient {
     // add tservers
     
     ZooCache zc = getZooCache(instance);
-    
+    AccumuloConfiguration conf = instance.getConfiguration();
     for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS))
{
       String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver;
       byte[] data = ZooUtil.getLockData(zc, path);
       if (data != null && !new String(data).equals("master"))
-        servers.add(new ThriftTransportKey(new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
instance.getConfiguration().getPort(
-            Property.TSERV_CLIENTPORT), instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)));
+        servers.add(new ThriftTransportKey(
+            new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),

+            conf.getPort(Property.TSERV_CLIENTPORT), 
+            rpcTimeout));
     }
     
     boolean opened = false;

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
Wed Dec 19 19:31:20 2012
@@ -69,7 +69,8 @@ public enum Property {
       "If true, use map/reduce to sort write-ahead logs during recovery"),
   MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts
to bulk-load a file before giving up."),
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The
number of threads to use when coordinating a bulk-import."),
-  MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),
+  MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to
wait for a tablet server to process a bulk import request"),
+  MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The
time between adjustments of the server thread pool."),
   
   // properties that are specific to tablet server behavior
@@ -148,7 +149,8 @@ public enum Property {
           + " the file to the appropriate tablets on all servers.  This property controls
the number of threads used to communicate to the other servers."),
   TSERV_BULK_RETRY("tserver.bulk.retry.max", "3", PropertyType.COUNT,
       "The number of times the tablet server will attempt to assign a file to a tablet as
it migrates and splits."),
-  TSERV_MINTHREADS("tserver.server.threads.minimum", "2", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),
+  TSERV_BULK_TIMEOUT("tserver.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to
wait for a tablet server to process a bulk import request."),
+  TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),
   TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The
time between adjustments of the server thread pool."),
   TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
       "The maximum time for a tablet server to be in the \"memory full\" state.  If the tablet
server cannot write out memory"

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
Wed Dec 19 19:31:20 2012
@@ -76,9 +76,14 @@ public class ThriftUtil {
   }
   
   static public TabletClientService.Iface getTServerClient(String address, AccumuloConfiguration
conf) throws TTransportException {
-    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT,
Property.GENERAL_RPC_TIMEOUT, conf);
+    return getTServerClient(address, conf, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT);
   }
   
+  static public TabletClientService.Iface getTServerClient(String address, AccumuloConfiguration
conf, Property port, Property timeout) throws TTransportException {
+    return getClient(new TabletClientService.Client.Factory(), address, port, timeout, conf);
+  }
+  
+
   public static void execute(String address, AccumuloConfiguration conf, ClientExec<TabletClientService.Iface>
exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/IZooReader.java
Wed Dec 19 19:31:20 2012
@@ -38,4 +38,5 @@ public interface IZooReader {
   
   public abstract boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
   
+  public void sync(final String path) throws KeeperException, InterruptedException;
 }
\ No newline at end of file

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooReader.java
Wed Dec 19 19:31:20 2012
@@ -17,11 +17,14 @@
 package org.apache.accumulo.core.zookeeper;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.Instance;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooReader implements IZooReader {
@@ -72,6 +75,29 @@ public class ZooReader implements IZooRe
     return getZooKeeper().exists(zPath, watcher) != null;
   }
   
+  @Override
+  public void sync(final String path) throws KeeperException, InterruptedException {
+    final int[] rc = { 0 };
+    final AtomicBoolean waiter = new AtomicBoolean(false);
+    getZooKeeper().sync(path, new VoidCallback() {
+      @Override
+      public void processResult(int code, String arg1, Object arg2) {
+        rc[0] = code;
+        synchronized (waiter) {
+          waiter.set(true);
+          waiter.notifyAll();
+        }
+      }}, null);
+    synchronized (waiter) {
+      while (!waiter.get())
+        waiter.wait();
+    }
+    Code code = Code.get(rc[0]);
+    if (code != KeeperException.Code.OK) {
+      throw KeeperException.create(code);
+    }
+  }
+  
   public ZooReader(String keepers, int timeout) {
     this.keepers = keepers;
     this.timeout = timeout;

Propchange: accumulo/branches/1.4/src/server/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src/server:r1423994,1424036,1424050
  Merged /accumulo/trunk/server:r1423994,1424036

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Wed Dec 19 19:31:20 2012
@@ -577,7 +577,7 @@ public class BulkImporter {
   private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>>
assignmentsPerTablet) throws AccumuloException,
       AccumuloSecurityException {
     try {
-      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(),
Property.TSERV_CLIENTPORT, Property.TSERV_BULK_TIMEOUT);
       try {
         HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>
files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
         for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet())
{

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Wed Dec 19 19:31:20 2012
@@ -540,7 +540,8 @@ class LoadFiles extends MasterRepo {
               // get a connection to a random tablet server, do not prefer cached connections
because
               // this is running on the master and there are lots of connections to tablet
servers
               // serving the !METADATA tablets
-              Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance(),
false);
+              long timeInMillis = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+              Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance(),
false, timeInMillis);
               client = pair.getSecond();
               server = pair.getFirst();
               List<String> attempt = Collections.singletonList(file);

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Wed Dec 19 19:31:20 2012
@@ -856,7 +856,7 @@ public class TabletServer extends Abstra
         throw e.asThriftException();
       }
       
-      ArrayList<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
       
       for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
         TKeyExtent tke = entry.getKey();
@@ -874,9 +874,7 @@ public class TabletServer extends Abstra
             failures.add(tke);
           }
         }
-        
       }
-      
       return failures;
     }
     

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java?rev=1424053&r1=1424052&r2=1424053&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
Wed Dec 19 19:31:20 2012
@@ -47,7 +47,9 @@ public class TransactionWatcher {
     
     @Override
     public boolean transactionAlive(String type, long tid) throws Exception {
-      return rdr.exists(ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid));
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid);
+      rdr.sync(path);
+      return rdr.exists(path);
     }
     
     public static void start(String type, long tid) throws KeeperException, InterruptedException
{



Mime
View raw message