tephra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEPHRA-215) Share PruneUpperBoundWriter across all TransactionProcessors on the same region server
Date Wed, 08 Feb 2017 11:51:42 GMT

    [ https://issues.apache.org/jira/browse/TEPHRA-215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857880#comment-15857880
] 

ASF GitHub Bot commented on TEPHRA-215:
---------------------------------------

Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100039577
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable,
String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval)
{
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region
server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    +      stopped = true;
           flushThread.interrupt();
         }
    --- End diff --
    
    Also add `flushThread.join()` after the interrupt to wait for the flush thread to stop.


> Share PruneUpperBoundWriter across all TransactionProcessors on the same region server
> --------------------------------------------------------------------------------------
>
>                 Key: TEPHRA-215
>                 URL: https://issues.apache.org/jira/browse/TEPHRA-215
>             Project: Tephra
>          Issue Type: Improvement
>    Affects Versions: 0.11.0-incubating
>            Reporter: Gokul Gunasekaran
>            Assignee: Gokul Gunasekaran
>             Fix For: 0.11.0-incubating
>
>
> Currently we start one prune upperbound writer thread per TransactionProcessor coprocessor.
Instead we should be able to share only one thread that flushes writes to the prune state
table periodically.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message