accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1424036 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/impl/ fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/src/main/java/org/apache/accumulo/server/client/ server/src/main/java/org/apache/accumulo/...
Date Wed, 19 Dec 2012 19:11:58 GMT
Author: ecn
Date: Wed Dec 19 19:11:58 2012
New Revision: 1424036

URL: http://svn.apache.org/viewvc?rev=1424036&view=rev
Log:
ACCUMULO-408 use sync to make sure we do not read old data from zookeeper

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
Wed Dec 19 19:11:58 2012
@@ -125,7 +125,7 @@ public class ServerClient {
   
   public static Pair<String,ClientService.Client> getConnection(Instance instance,
boolean preferCachedConnections) throws TTransportException {
     AccumuloConfiguration conf = instance.getConfiguration();
-    return getConnection(instance, false, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+    return getConnection(instance, preferCachedConnections, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
   }
   
   public static Pair<String,ClientService.Client> getConnection(Instance instance,
boolean preferCachedConnections, long rpcTimeout) throws TTransportException {

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java Wed
Dec 19 19:11:58 2012
@@ -38,4 +38,6 @@ public interface IZooReader {
   
   public abstract boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
   
+  public abstract void sync(final String path) throws KeeperException, InterruptedException;
+  
 }

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java Wed
Dec 19 19:11:58 2012
@@ -17,10 +17,12 @@
 package org.apache.accumulo.fate.zookeeper;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooReader implements IZooReader {
@@ -71,6 +73,24 @@ public class ZooReader implements IZooRe
     return getZooKeeper().exists(zPath, watcher) != null;
   }
   
+  @Override
+  public void sync(final String path) throws KeeperException, InterruptedException {
+    final AtomicBoolean waiter = new AtomicBoolean(false);
+    getZooKeeper().sync(path, new VoidCallback() {
+      @Override
+      public void processResult(int arg0, String arg1, Object arg2) {
+        synchronized (waiter) {
+          waiter.set(true);
+          waiter.notifyAll();
+        }
+      }}, null);
+    synchronized (waiter) {
+      while (!waiter.get())
+        waiter.wait();
+    }
+  }
+  
+  
   public ZooReader(String keepers, int timeout) {
     this.keepers = keepers;
     this.timeout = timeout;

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
(original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
Wed Dec 19 19:11:58 2012
@@ -22,13 +22,11 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.security.SecurityPermission;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.BadVersionException;
@@ -205,21 +203,5 @@ public class ZooReaderWriter extends Zoo
     putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
   }
 
-  @Override
-  public void sync(final String path) throws KeeperException, InterruptedException {
-    final AtomicBoolean waiter = new AtomicBoolean(false);
-    getZooKeeper().sync(path, new VoidCallback() {
-      @Override
-      public void processResult(int arg0, String arg1, Object arg2) {
-        synchronized (waiter) {
-          waiter.set(true);
-          waiter.notifyAll();
-        }
-      }}, null);
-    synchronized (waiter) {
-      if (!waiter.get())
-        waiter.wait();
-    }
-  }
-  
+
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Wed Dec 19 19:11:58 2012
@@ -578,7 +578,7 @@ public class BulkImporter {
   private List<KeyExtent> assignMapFiles(AuthInfo credentials, String location, Map<KeyExtent,List<PathSize>>
assignmentsPerTablet) throws AccumuloException,
       AccumuloSecurityException {
     try {
-      long timeInMillis = instance.getConfiguration().getMemoryInBytes(Property.TSERV_BULK_TIMEOUT);
+      long timeInMillis = instance.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
       TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(),
timeInMillis);
       try {
         HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>
files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Wed Dec 19 19:11:58 2012
@@ -49,7 +49,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -869,40 +868,36 @@ public class TabletServer extends Abstra
     }
     
     @Override
-    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, final long
tid, final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
-        throws TException {
+    public List<TKeyExtent> bulkImport(TInfo tinfo, AuthInfo credentials, long tid,
Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+        throws ThriftSecurityException {
       
       try {
         if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
           throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
-        return transactionWatcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>()
{
-          public List<TKeyExtent> call() throws Exception {
-            List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-            for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
-              TKeyExtent tke = entry.getKey();
-              Map<String,MapFileInfo> fileMap = entry.getValue();
-              
-              Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-              
-              if (importTablet == null) {
-                failures.add(tke);
-              } else {
-                try {
-                  importTablet.importMapFiles(tid, fileMap, setTime);
-                } catch (IOException ioe) {
-                  log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke)
+ ": " + ioe.getMessage());
-                  failures.add(tke);
-                }
-              }
-            }
-            return failures;
-          }
-        });
       } catch (AccumuloSecurityException e) {
         throw e.asThriftException();
-      } catch (Exception ex) {
-        throw new TException(ex);
       }
+      
+      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+      
+      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet())
{
+        TKeyExtent tke = entry.getKey();
+        Map<String,MapFileInfo> fileMap = entry.getValue();
+        
+        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+        
+        if (importTablet == null) {
+          failures.add(tke);
+        } else {
+          try {
+            importTablet.importMapFiles(tid, fileMap, setTime);
+          } catch (IOException ioe) {
+            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke)
+ ": " + ioe.getMessage());
+            failures.add(tke);
+          }
+        }
+      }
+      return failures;
     }
     
     private class NextBatchTask extends ScanTask<ScanBatch> {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java?rev=1424036&r1=1424035&r2=1424036&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
Wed Dec 19 19:11:58 2012
@@ -33,7 +33,9 @@ public class TransactionWatcher extends 
     
     @Override
     public boolean transactionAlive(String type, long tid) throws Exception {
-      return rdr.exists(ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid));
+      String path = ZooUtil.getRoot(instance) + "/" + type + "/" + Long.toString(tid);
+      rdr.sync(path);
+      return rdr.exists(path);
     }
     
     public static void start(String type, long tid) throws KeeperException, InterruptedException
{



Mime
View raw message