accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [18/34] accumulo git commit: ACCUMULO-3423 updates from reviews
Date Fri, 24 Apr 2015 23:21:04 GMT
ACCUMULO-3423 updates from reviews


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c2ca7a5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c2ca7a5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c2ca7a5

Branch: refs/heads/master
Commit: 9c2ca7a5cce8f7d0bacb100fed6405c86bde2a2d
Parents: affff42
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Apr 14 14:52:07 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Apr 14 14:52:07 2015 -0400

----------------------------------------------------------------------
 .../core/tabletserver/log/LogEntry.java         |  4 +++
 .../server/master/state/MetaDataStateStore.java |  3 ++
 .../accumulo/server/replication/StatusUtil.java | 12 +++++---
 .../gc/GarbageCollectWriteAheadLogs.java        | 30 ++++++++++++++------
 .../apache/accumulo/tserver/TabletServer.java   |  4 +--
 .../apache/accumulo/tserver/log/DfsLogger.java  |  1 +
 .../tserver/log/TabletServerLogger.java         | 10 +++++--
 7 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 964e3b3..90ce692 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -81,6 +81,10 @@ public class LogEntry {
   static private final Text EMPTY_TEXT = new Text();
 
   public static LogEntry fromKeyValue(Key key, Value value) {
+    String qualifier = key.getColumnQualifier().toString();
+    if (qualifier.indexOf('/') < 1) {
+      throw new IllegalArgumentException("Bad key for log entry: " + key);
+    }
     KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
     String[] parts = key.getColumnQualifier().toString().split("/", 2);
     String server = parts[0];

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index adcf04d..c154bd0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -168,6 +168,9 @@ public class MetaDataStateStore extends TabletStateStore {
     BatchWriter writer = createBatchWriter();
     try {
       for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
+        if (entry.getValue().isEmpty()) {
+          continue;
+        }
         Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
         for (Path log : entry.getValue()) {
           m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index e973ebc..d72eea2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -32,7 +32,6 @@ public class StatusUtil {
   private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
 
   private static final Status.Builder CREATED_STATUS_BUILDER;
-  private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER;
 
   static {
     CREATED_STATUS_BUILDER = Status.newBuilder();
@@ -46,7 +45,6 @@ public class StatusUtil {
     builder.setEnd(0);
     builder.setInfiniteEnd(true);
     builder.setClosed(false);
-    INF_END_REPLICATION_STATUS_BUILDER = builder;
     INF_END_REPLICATION_STATUS = builder.build();
     INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
 
@@ -155,8 +153,14 @@ public class StatusUtil {
   /**
    * @return A {@link Status} for an open file of unspecified length, all of which needs
replicating.
    */
-  public static synchronized Status openWithUnknownLength(long timeCreated) {
-    return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build();
+  public static Status openWithUnknownLength(long timeCreated) {
+    Builder builder = Status.newBuilder();
+    builder.setBegin(0);
+    builder.setEnd(0);
+    builder.setInfiniteEnd(true);
+    builder.setClosed(false);
+    builder.setCreatedTime(timeCreated);
+    return builder.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 59612ab..9f537af 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -130,9 +131,9 @@ public class GarbageCollectWriteAheadLogs {
       status.currentLog.candidates = count;
       span.stop();
 
-      span = Trace.start("removeMetadataEntries");
+      span = Trace.start("removeEntriesInUse");
       try {
-        count = removeMetadataEntries(candidates, status, currentServers);
+        count = removeEntriesInUse(candidates, status, currentServers);
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
         return;
@@ -165,7 +166,7 @@ public class GarbageCollectWriteAheadLogs {
       span.stop();
 
       span = Trace.start("removeMarkers");
-      count = removeMarkers(candidates);
+      count = removeTabletServerMarkers(candidates);
       long removeMarkersStop = System.currentTimeMillis();
       log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop
- removeStop) / 1000.));
       span.stop();
@@ -182,7 +183,7 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) {
+  private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates)
{
     long result = 0;
     try {
       BatchWriter root = null;
@@ -231,15 +232,19 @@ public class GarbageCollectWriteAheadLogs {
     return status.currentLog.deleted;
   }
 
-  private long removeMetadataEntries(Map<TServerInstance, Set<Path> > candidates,
GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
+  private UUID path2uuid(Path path) {
+    return UUID.fromString(path.getName());
+  }
+
+  private long removeEntriesInUse(Map<TServerInstance, Set<Path> > candidates,
GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
       InterruptedException {
 
     // remove any entries if there's a log reference, or a tablet is still assigned to the
dead server
 
-    Map<Path, TServerInstance> walToDeadServer = new HashMap<>();
+    Map<UUID, TServerInstance> walToDeadServer = new HashMap<>();
     for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
       for (Path file : entry.getValue()) {
-        walToDeadServer.put(file, entry.getKey());
+        walToDeadServer.put(path2uuid(file), entry.getKey());
       }
     }
     long count = 0;
@@ -254,9 +259,16 @@ public class GarbageCollectWriteAheadLogs {
       }
       for (Collection<String> wals : state.walogs) {
         for (String wal : wals) {
-          TServerInstance dead = walToDeadServer.get(new Path(wal));
+          UUID walUUID = path2uuid(new Path(wal));
+          TServerInstance dead = walToDeadServer.get(walUUID);
           if (dead != null) {
-            candidates.get(dead).remove(wal);
+            Iterator<Path> iter = candidates.get(dead).iterator();
+            while (iter.hasNext()) {
+              if (path2uuid(iter.next()).equals(walUUID)) {
+                iter.remove();
+                break;
+              }
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e28d472..af45c14 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2893,7 +2893,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq)
throws IOException {
     totalMinorCompactions.incrementAndGet();
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
-    removeUnusedWALs();
+    markUnusedWALs();
   }
 
   public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String
newMapfileLocation) throws IOException {
@@ -3000,7 +3000,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
   // remove any meta entries after a rolled log is no longer referenced
   Set<DfsLogger> closedLogs = new HashSet<>();
 
-  private void removeUnusedWALs() {
+  private void markUnusedWALs() {
     Set<DfsLogger> candidates;
     synchronized (closedLogs) {
       candidates = new HashSet<>(closedLogs);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index e256604..6e9cdf9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -383,6 +383,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
 
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
+    log.debug("Address is " + address);
     String logger = Joiner.on("+").join(address.split(":"));
 
     log.debug("DfsLogger.open() begin");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 04e7a83..dd54798 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -79,7 +79,7 @@ public class TabletServerLogger {
   // The current logger
   private DfsLogger currentLog = null;
   private final SynchronousQueue<DfsLogger> nextLog = new SynchronousQueue<>();
-  private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+  private ThreadPoolExecutor nextLogMaker;
 
   // The current generation of logs.
   // Because multiple threads can be using a log at one time, a log
@@ -150,7 +150,6 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
     this.syncCounter = syncCounter;
     this.flushCounter = flushCounter;
-    startLogMaker();
   }
 
   private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
@@ -200,6 +199,7 @@ public class TabletServerLogger {
     }
 
     try {
+      startLogMaker();
       DfsLogger next = nextLog.take();
       log.info("Using next log " + next.getFileName());
       currentLog = next;
@@ -214,7 +214,11 @@ public class TabletServerLogger {
     }
   }
 
-  private void startLogMaker() {
+  private synchronized void startLogMaker() {
+    if (nextLogMaker != null) {
+      return;
+    }
+    nextLogMaker = new SimpleThreadPool(1, "WALog creator");
     nextLogMaker.submit(new Runnable() {
       @Override
       public void run() {


Mime
View raw message