accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [34/50] git commit: ACCUMULO-1000 modified conditional map to not process tservers concurrently and reuse sessions
Date Tue, 23 Jul 2013 16:54:59 GMT
ACCUMULO-1000 modified conditional map to not process tservers concurrently and reuse sessions


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fdb95b40
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fdb95b40
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fdb95b40

Branch: refs/heads/ACCUMULO-1000
Commit: fdb95b40513094786b646dd682bb9d58ae06365d
Parents: ec53713
Author: keith@deenlo.com <keith@deenlo.com>
Authored: Sat Jul 20 12:07:23 2013 -0400
Committer: keith@deenlo.com <keith@deenlo.com>
Committed: Sat Jul 20 12:07:23 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java | 128 ++++++++++++++++---
 .../server/tabletserver/TabletServer.java       |   2 +-
 2 files changed, 108 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdb95b40/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 31403fb..0e86ec7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -90,8 +90,12 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private TabletLocator locator;
   private String tableId;
 
-
-  private Map<String,BlockingQueue<TabletServerMutations<QCMutation>>>
serverQueues;
+  private static class ServerQueue {
+    BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
+    boolean taskQueued = false;
+  }
+  
+  private Map<String,ServerQueue> serverQueues;
   private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>();
   private ScheduledThreadPoolExecutor threadPool;
   
@@ -168,16 +172,17 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
   
-  private BlockingQueue<TabletServerMutations<QCMutation>> getServerQueue(String
location) {
-    BlockingQueue<TabletServerMutations<QCMutation>> queue;
+  private ServerQueue getServerQueue(String location) {
+    ServerQueue serverQueue;
     synchronized (serverQueues) {
-      queue = serverQueues.get(location);
-      if (queue == null) {
-        queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
-        serverQueues.put(location, queue);
+       serverQueue = serverQueues.get(location);
+      if (serverQueue == null) {
+        
+        serverQueue = new ServerQueue();
+        serverQueues.put(location, serverQueue);
       }
     }
-    return queue;
+    return serverQueue;
   }
   
   private void queueRetry(List<QCMutation> mutations) {
@@ -222,14 +227,38 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
   private void queue(String location, TabletServerMutations<QCMutation> mutations)
{
     
-    BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location);
+    ServerQueue serverQueue = getServerQueue(location);
     
-    queue.add(mutations);
-    threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
+    synchronized (serverQueue) {
+      serverQueue.queue.add(mutations);
+      //never execute more that one task per server
+      if(!serverQueue.taskQueued){
+        threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
+        serverQueue.taskQueued = true;
+      }
+    }
+   
   }
 
+  private void reschedule(SendTask task){
+    ServerQueue serverQueue = getServerQueue(task.location);
+    // just finished processing work for this server, could reschedule if it has more work
or immediately process the work
+    // this code reschedules the the server for processing later... there may be other queues
with
+    // more data that need to be processed... also it will give the current server time to
build
+    // up more data... the thinking is that rescheduling instead or processing immediately
will result
+    // in bigger batches and less RPC overhead
+    
+    synchronized (serverQueue) {
+      if(serverQueue.queue.size() > 0)
+        threadPool.execute(new LoggingRunnable(log, task));
+      else
+        serverQueue.taskQueued = false;
+    }
+    
+  }
+  
   private TabletServerMutations<QCMutation> dequeue(String location) {
-    BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location);
+    BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location).queue;
     
     ArrayList<TabletServerMutations<QCMutation>> mutations = new ArrayList<TabletLocator.TabletServerMutations<QCMutation>>();
     queue.drainTo(mutations);
@@ -268,7 +297,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.threadPool = new ScheduledThreadPoolExecutor(3);
     this.threadPool.setMaximumPoolSize(3);
     this.locator = TabletLocator.getLocator(instance, new Text(tableId));
-    this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations<QCMutation>>>();
+    this.serverQueues = new HashMap<String,ServerQueue>();
     this.tableId = tableId;
 
     Runnable failureHandler = new Runnable() {
@@ -319,7 +348,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private class SendTask implements Runnable {
     
 
-    private String location;
+    String location;
     
     public SendTask(String location) {
       this.location = location;
@@ -331,6 +360,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
       TabletServerMutations<QCMutation> mutations = dequeue(location);
       if (mutations != null)
         sendToServer(location, mutations);
+      
+      reschedule(this);
     }
   }
   
@@ -345,6 +376,55 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
 
+  private static class SessionID {
+    long sessionID;
+    boolean reserved;
+  }
+  
+  private HashMap<String, SessionID> cachedSessionIDs = new HashMap<String, SessionID>();
+  
+  private Long reserveSessionID(String location, TabletClientService.Iface client, TInfo
tinfo) throws ThriftSecurityException, TException {
+    //avoid cost of repeatedly making RPC to create sessions, reuse sessions
+    synchronized (cachedSessionIDs) {
+      SessionID sid = cachedSessionIDs.get(location);
+      if (sid != null) {
+        if (sid.reserved)
+          throw new IllegalStateException();
+        
+        sid.reserved = true;
+        return sid.sessionID;
+      }
+    }
+    
+    Long sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()),
tableId);
+    
+    synchronized (cachedSessionIDs) {
+      SessionID sid = new SessionID();
+      sid.reserved = true;
+      sid.sessionID = sessionId;
+      if(cachedSessionIDs.put(location, sid) != null)
+        throw new IllegalStateException();
+    }
+    
+    return sessionId;
+  }
+  
+  private void invalidateSessionID(String location) {
+    synchronized (cachedSessionIDs) {
+      cachedSessionIDs.remove(location);
+    }
+    
+  }
+  
+  private void unreserveSessionID(String location){
+    synchronized (cachedSessionIDs) {
+      SessionID sid = cachedSessionIDs.get(location);
+      if(!sid.reserved)
+        throw new IllegalStateException();
+      sid.reserved = false;
+    }
+  }
+  
   private void sendToServer(String location, TabletServerMutations<QCMutation> mutations)
{
     TabletClientService.Iface client = null;
     
@@ -363,11 +443,17 @@ class ConditionalWriterImpl implements ConditionalWriter {
       CompressedIterators compressedIters = new CompressedIterators();
       convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters);
       
-      //TODO create a session per tserver and keep reusing it
-      sessionId = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()),
tableId);
+      List<TCMResult> tresults = null;
+      while (tresults == null) {
+        try {
+          sessionId = reserveSessionID(location, client, tinfo);
+          tresults = client.conditionalUpdate(tinfo, sessionId, tmutations, compressedIters.getSymbolTable());
+        } catch (NoSuchScanIDException nssie) {
+          sessionId = null;
+          invalidateSessionID(location);
+        }
+      }
       
-      List<TCMResult> tresults = client.conditionalUpdate(tinfo, sessionId, tmutations,
compressedIters.getSymbolTable());
-
       HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>();
 
       ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
@@ -392,8 +478,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
       queueRetry(ignored);
 
-    } catch (NoSuchScanIDException nssie){
-    	queueRetry(cmidToCm);
     } catch (ThriftSecurityException tse) {
       AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(),
tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
           tableId), tse);
@@ -409,6 +493,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     } catch (Exception e) {
       queueException(location, cmidToCm, e);
     } finally {
+      unreserveSessionID(location);
       ThriftUtil.returnClient((TServiceClient) client);
     }
   }
@@ -591,6 +676,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   
   @Override
   public void close() {
+    //TODO could possible close cached sessions using async method to clean up sessions on
server side
     threadPool.shutdownNow();
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdb95b40/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index ee1d1b6..013639e 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1962,7 +1962,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   
         return results;
       }finally{
-        sessionManager.removeSession(sessID, true);
+        sessionManager.unreserveSession(sessID);
       }
     }
 


Mime
View raw message