accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [05/10] git commit: ACCUMULO-1000 made conditional writer use thread pool and work properly when multiple threads call it
Date Wed, 17 Jul 2013 12:58:19 GMT
ACCUMULO-1000 made conditional writer use thread pool and work properly when multiple threads
call it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/04457eb4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/04457eb4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/04457eb4

Branch: refs/heads/ACCUMULO-1000
Commit: 04457eb4e9cbc34acdfb37da4f19b2b35ecfe537
Parents: 49a7626
Author: Keith Turner <kturner@apache.org>
Authored: Tue Jul 16 08:14:41 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Tue Jul 16 08:20:39 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java | 309 +++++++++++++------
 .../accumulo/core/data/ConditionalMutation.java |   8 +-
 2 files changed, 223 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/04457eb4/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 73aa480..04a2753 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -26,6 +26,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -56,7 +62,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
@@ -76,47 +81,103 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private Map cache;
   private Instance instance;
   private TCredentials credentials;
+  private TabletLocator locator;
+
+
+  private Map<String,BlockingQueue<TabletServerMutations>> serverQueues;
+  private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>();
+  private ScheduledExecutorService threadPool;
   
-  ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, Authorizations
authorizations) {
-    cache = Collections.synchronizedMap(new LRUMap(1000));
-    this.instance = instance;
-    this.credentials = credentials;
-    this.tableId = new Text(tableId);
-    this.auths = authorizations;
-    this.ve = new VisibilityEvaluator(authorizations);
+  private static class RQIterator implements Iterator<Result> {
+    
+    private BlockingQueue<Result> rq;
+    private int count;
+    
+    public RQIterator(BlockingQueue<Result> resultQueue, int count) {
+      this.rq = resultQueue;
+      this.count = count;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return count > 0;
+    }
+    
+    @Override
+    public Result next() {
+      try {
+        // TODO maybe call drainTo after take to get a batch efficiently
+        Result result = rq.take();
+        count--;
+        return result;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
   }
 
-  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+  private static class QCMutation extends ConditionalMutation implements Delayed {
+    private BlockingQueue<Result> resultQueue;
+    private long resetTime;
+    private long delay = 50;
     
-
-    TabletLocator locator = TabletLocator.getLocator(instance, tableId);
+    QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue) {
+      super(cm);
+      this.resultQueue = resultQueue;
+    }
     
-    List<Mutation> mutationList = new ArrayList<Mutation>();
-
-    ArrayList<Result> results = new ArrayList<Result>();
+    @Override
+    public int compareTo(Delayed o) {
+      QCMutation oqcm = (QCMutation) o;
+      return Long.valueOf(resetTime).compareTo(Long.valueOf(oqcm.resetTime));
+    }
     
-    mloop: while (mutations.hasNext()) {
-      ConditionalMutation mut = mutations.next();
-
-      for (Condition cond : mut.getConditions()) {
-        if (!isVisible(cond.getVisibility())) {
-          results.add(new Result(Status.INVISIBLE_VISIBILITY, mut));
-          continue mloop;
-        }
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(delay - (System.currentTimeMillis() - resetTime), TimeUnit.MILLISECONDS);
+    }
+    
+    void resetDelay() {
+      // TODO eventually timeout a mutation
+      delay = Math.min(delay * 2, 5000);
+      resetTime = System.currentTimeMillis();
+    }
+  }
+  
+  private BlockingQueue<TabletServerMutations> getServerQueue(String location) {
+    BlockingQueue<TabletServerMutations> queue;
+    synchronized (serverQueues) {
+      queue = serverQueues.get(location);
+      if (queue == null) {
+        queue = new LinkedBlockingQueue<TabletServerMutations>();
+        serverQueues.put(location, queue);
       }
-
-      mutationList.add(mut);
     }
+    return queue;
+  }
+  
+  private void queueFailed(List<QCMutation> mutations) {
+    for (QCMutation qcm : mutations) {
+      qcm.resetDelay();
+    }
+    
+    failedMutations.addAll(mutations);
+  }
 
+  private void queue(List<QCMutation> mutations) {
+    List<Mutation> failures = new ArrayList<Mutation>();
+    Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations>();
+    
+    List<Mutation> ml = (List<Mutation>) (List<? extends Mutation>) mutations;
+    
     try {
-      List<Mutation> ignored = (List<Mutation>) (ArrayList<? extends Mutation>)
sendToServers(locator, mutationList, results);
-      
-      while (ignored.size() > 0) {
-        // TODO requeue ignored and return whats done for iteration
-        ignored = (List<Mutation>) (ArrayList<? extends Mutation>) sendToServers(locator,
ignored, results);
-      }
-
-      return results.iterator();
+      locator.binMutations(ml, binnedMutations, failures, credentials);
     } catch (AccumuloException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
@@ -127,92 +188,148 @@ class ConditionalWriterImpl implements ConditionalWriter {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
+    
+    if (failures.size() > 0)
+      queueFailed((List<QCMutation>) (List<? extends Mutation>) failures);
+
+    for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
+      queue(entry.getKey(), entry.getValue());
+    }
 
-    return null;
   }
 
-  private class SendTask implements Runnable {
+  private void queue(String location, TabletServerMutations mutations) {
     
-    private TabletServerMutations mutations;
-    private String location;
-    private ArrayList<Result> results;
-    private List<ConditionalMutation> ignored;
-    private TabletLocator locator;
+    BlockingQueue<TabletServerMutations> queue = getServerQueue(location);
     
-    public SendTask(String location, TabletServerMutations mutations, ArrayList<Result>
results, ArrayList<ConditionalMutation> ignored, TabletLocator locator) {
-      this.location = location;
-      this.mutations = mutations;
-      this.results = results;
-      this.ignored = ignored;
-      this.locator = locator;
-    }
+    queue.add(mutations);
+    threadPool.execute(new SendTask(location));
+  }
+
+  private TabletServerMutations dequeue(String location) {
+    BlockingQueue<TabletServerMutations> queue = getServerQueue(location);
     
-    @Override
-    public void run() {
-      ArrayList<Result> tmpResults = new ArrayList<ConditionalWriter.Result>();
-      List<ConditionalMutation> tmpIgnored = new ArrayList<ConditionalMutation>();
-      
-      sendToServer(location, mutations, tmpResults, tmpIgnored, locator);
+    ArrayList<TabletServerMutations> mutations = new ArrayList<TabletLocator.TabletServerMutations>();
+    queue.drainTo(mutations);
+    
+    if (mutations.size() == 0)
+      return null;
+    
+    if (mutations.size() == 1) {
+      return mutations.get(0);
+    } else {
+      // merge multiple request to a single tablet server
+      TabletServerMutations tsm = mutations.get(0);
       
-      synchronized (results) {
-        results.addAll(tmpResults);
-        ignored.addAll(tmpIgnored);
+      for (int i = 1; i < mutations.size(); i++) {
+        for (Entry<KeyExtent,List<Mutation>> entry : mutations.get(i).getMutations().entrySet())
{
+          List<Mutation> list = tsm.getMutations().get(entry.getKey());
+          if (list == null) {
+            list = new ArrayList<Mutation>();
+            tsm.getMutations().put(entry.getKey(), list);
+          }
+          
+          list.addAll(entry.getValue());
+        }
       }
+      
+      return tsm;
     }
   }
-  protected ArrayList<ConditionalMutation> sendToServers(TabletLocator locator, List<Mutation>
mutationList, ArrayList<Result> results) throws AccumuloException,
-      AccumuloSecurityException, TableNotFoundException {
 
-    List<Mutation> failures = new ArrayList<Mutation>();
-    Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations>();
+  ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, Authorizations
authorizations) {
+    cache = Collections.synchronizedMap(new LRUMap(1000));
+    this.instance = instance;
+    this.credentials = credentials;
+    this.tableId = new Text(tableId);
+    this.auths = authorizations;
+    this.ve = new VisibilityEvaluator(authorizations);
+    // TODO make configurable
+    this.threadPool = Executors.newScheduledThreadPool(3);
+    this.locator = TabletLocator.getLocator(instance, new Text(tableId));
+    this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations>>();
+    
+    Runnable failureHandler = new Runnable() {
+      
+      @Override
+      public void run() {
+        try {
+          List<QCMutation> mutations = new ArrayList<QCMutation>();
+          failedMutations.drainTo(mutations);
+          queue(mutations);
+        } catch (Exception e) {
+          // TODO log
+          e.printStackTrace();
+        }
+        
+      }
+    };
+    
+    threadPool.scheduleAtFixedRate(failureHandler, 100, 100, TimeUnit.MILLISECONDS);
+    
+    // TODO need to shutdown thread pool
+  }
 
-    do {
-      binnedMutations.clear();
-      failures.clear();
+  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
-      locator.binMutations(mutationList, binnedMutations, failures, credentials);
-      
-      // TODO queue failed mutations to be retried in a bit and write what can be written
-      if (failures.size() > 0)
-        UtilWaitThread.sleep(100);
+    BlockingQueue resultQueue = new LinkedBlockingQueue<Result>();
 
-    } while (failures.size() > 0);
+    List<QCMutation> mutationList = new ArrayList<QCMutation>();
+
+    int count = 0;
+
+    mloop: while (mutations.hasNext()) {
+      // TODO stop reading from iterator if too much memory
+      ConditionalMutation mut = mutations.next();
+      count++;
+
+      for (Condition cond : mut.getConditions()) {
+        if (!isVisible(cond.getVisibility())) {
+          resultQueue.add(new Result(Status.INVISIBLE_VISIBILITY, mut));
+          continue mloop;
+        }
+      }
+
+      // copy the mutations so that even if caller changes it, it will not matter
+      mutationList.add(new QCMutation(mut, resultQueue));
+    }
+
+    queue(mutationList);
+
+    return new RQIterator(resultQueue, count);
+
+  }
+
+  private class SendTask implements Runnable {
     
-    ArrayList<ConditionalMutation> ignored = new ArrayList<ConditionalMutation>();
 
-    ArrayList<Thread> threads = new ArrayList<Thread>();
+    private String location;
+    
+    public SendTask(String location) {
+      this.location = location;
 
-    for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
-      Thread t = new Thread(new SendTask(entry.getKey(), entry.getValue(), results, ignored,
locator));
-      threads.add(t);
-      t.start();
     }
     
-    for (Thread thread : threads) {
-      try {
-        thread.join();
-      } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    @Override
+    public void run() {
+      TabletServerMutations mutations = dequeue(location);
+      if (mutations != null)
+        sendToServer(location, mutations);
     }
-
-    return ignored;
   }
   
   private static class CMK {
 
-    ConditionalMutation cm;
+    QCMutation cm;
     KeyExtent ke;
     
-    public CMK(KeyExtent ke, ConditionalMutation cm) {
+    public CMK(KeyExtent ke, QCMutation cm) {
       this.ke = ke;
       this.cm = cm;
     }
   }
 
-  private void sendToServer(String location, TabletServerMutations mutations, ArrayList<Result>
results, List<ConditionalMutation> ignored,
-      TabletLocator locator) {
+  private void sendToServer(String location, TabletServerMutations mutations) {
     TabletClientService.Iface client = null;
     
     TInfo tinfo = Tracer.traceInfo();
@@ -231,35 +348,41 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
       HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>();
 
+      ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
+
       for (TCMResult tcmResult : tresults) {
         if (tcmResult.status == TCMStatus.IGNORED) {
           CMK cmk = cmidToCm.get(tcmResult.cmid);
           ignored.add(cmk.cm);
           extentsToInvalidate.add(cmk.ke);
         } else {
-          results.add(new Result(fromThrift(tcmResult.status), cmidToCm.get(tcmResult.cmid).cm));
+          QCMutation qcm = cmidToCm.get(tcmResult.cmid).cm;
+          qcm.resultQueue.add(new Result(fromThrift(tcmResult.status), qcm));
         }
       }
 
+
       // TODO maybe have thrift call return bad extents
 
       for (KeyExtent ke : extentsToInvalidate) {
         locator.invalidateCache(ke);
       }
 
+      queueFailed(ignored);
+
     } catch (TTransportException e) {
       locator.invalidateCache(location);
       for (CMK cmk : cmidToCm.values())
-        results.add(new Result(Status.UNKNOWN, cmk.cm));
+        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
     } catch (TApplicationException tae) {
       for (CMK cmk : cmidToCm.values())
-        results.add(new Result(Status.UNKNOWN, cmk.cm));
+        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
       // TODO should another status be used?
       // TODO need to get server where error occurred back to client
     } catch (TException e) {
       locator.invalidateCache(location);
       for (CMK cmk : cmidToCm.values())
-        results.add(new Result(Status.UNKNOWN, cmk.cm));
+        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
     } finally {
       ThriftUtil.returnClient((TServiceClient) client);
     }
@@ -288,9 +411,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
       TKeyExtent tke = entry.getKey().toThrift();
       ArrayList<TConditionalMutation> tcondMutaions = new ArrayList<TConditionalMutation>();
       
-      List<ConditionalMutation> condMutations = (List<ConditionalMutation>) (List<?
extends Mutation>) entry.getValue();
+      List<QCMutation> condMutations = (List<QCMutation>) (List<? extends
Mutation>) entry.getValue();
       
-      for (ConditionalMutation cm : condMutations) {
+      for (QCMutation cm : condMutations) {
         TMutation tm = cm.toThrift();
         
         

http://git-wip-us.apache.org/repos/asf/accumulo/blob/04457eb4/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
index 5b38559..23bf7d0 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.data;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.accumulo.core.util.ArgumentChecker;
@@ -57,6 +58,11 @@ public class ConditionalMutation extends Mutation {
     init(condition, conditions);
   }
   
+  public ConditionalMutation(ConditionalMutation cm) {
+    super(cm);
+    this.conditions = new ArrayList<Condition>(cm.conditions);
+  }
+
   private void init(Condition condition, Condition... conditions) {
     ArgumentChecker.notNull(condition);
     this.conditions.add(condition);
@@ -71,7 +77,7 @@ public class ConditionalMutation extends Mutation {
   }
   
   public List<Condition> getConditions() {
-    return conditions;
+    return Collections.unmodifiableList(conditions);
   }
 
 }


Mime
View raw message