accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1341087 - in /accumulo/branches/ACCUMULO-578: ./ conf/ core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/logger/ server/src/main/java/org/apache/a...
Date Mon, 21 May 2012 16:03:57 GMT
Author: ecn
Date: Mon May 21 16:03:56 2012
New Revision: 1341087

URL: http://svn.apache.org/viewvc?rev=1341087&view=rev
Log:
ACCUMULO-578 checking in prototype wal-on-hdfs

Added:
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java   (with props)
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java   (with props)
Modified:
    accumulo/branches/ACCUMULO-578/conf/   (props changed)
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/Constants.java
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/ACCUMULO-578/pom.xml
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
    accumulo/branches/ACCUMULO-578/server/src/test/java/org/apache/accumulo/server/logger/TestLogWriter.java
    accumulo/branches/ACCUMULO-578/test/system/continuous/   (props changed)
    accumulo/branches/ACCUMULO-578/test/system/continuous/logs/   (props changed)

Propchange: accumulo/branches/ACCUMULO-578/conf/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 21 16:03:56 2012
@@ -8,3 +8,9 @@ monitor
 accumulo-env.sh
 accumulo-site.xml
 accumulo_user_manual.pdf
+
+log4j.properties
+
+monitor_logger.xml
+
+generic_logger.xml

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/Constants.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/Constants.java Mon May 21 16:03:56 2012
@@ -184,4 +184,12 @@ public class Constants {
     return getMetadataTableDir(conf) + ZROOT_TABLET;
   }
   
+  /**
+   * @param conf
+   * @return
+   */
+  public static String getWalDirectory(AccumuloConfiguration conf) {
+    return getBaseDir(conf) + "/wal";
+  }
+  
 }

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java Mon May 21 16:03:56 2012
@@ -148,6 +148,7 @@ public enum Property {
   TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
       "The maximum time for a tablet server to be in the \"memory full\" state.  If the tablet server cannot write out memory"
           + " in this much time, it will assume there is some failure local to its node, and quit.  A value of zero is equivalent to forever."),
+  TSERV_USE_DFS_WAL("tserver.use.dfs.wal", "false", PropertyType.BOOLEAN, "Use the distributed file system for write-ahead loggging."),
   
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),

Modified: accumulo/branches/ACCUMULO-578/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/pom.xml?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/pom.xml (original)
+++ accumulo/branches/ACCUMULO-578/pom.xml Mon May 21 16:03:56 2012
@@ -463,7 +463,7 @@
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-core</artifactId>
-        <version>0.20.203.0</version>
+        <version>0.20.205.0</version>
         <scope>provided</scope>
       </dependency>
       <dependency>

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon May 21 16:03:56 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.accumulo.server.logger;
 
-import java.io.FileNotFoundException;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -36,9 +36,9 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 
 public class LogReader {
@@ -108,28 +108,33 @@ public class LogReader {
       
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else if (local.isFile(path)) {
         // read log entries from a simple file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(local, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else {
-        try {
-          // read the log entries sorted in a map file
-          MultiReader input = new MultiReader(fs, conf, file);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
-        } catch (FileNotFoundException ex) {
-          SequenceFile.Reader input = new SequenceFile.Reader(local, new Path(file), conf);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
+        // read the log entries sorted in a map file
+        MultiReader input = new MultiReader(fs, conf, file);
+        while (input.next(key, value)) {
+          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       }
     }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogService.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogService.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogService.java Mon May 21 16:03:56 2012
@@ -65,7 +65,6 @@ import org.apache.accumulo.server.logger
 import org.apache.accumulo.server.security.Authenticator;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.ZKAuthenticator;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.FileSystemMonitor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.TServerUtils;
@@ -122,6 +121,7 @@ public class LogService implements Mutat
 
     LogService logService;
     Instance instance = HdfsZooInstance.getInstance();
+    CachedConfiguration.getInstance().set("dfs.support.append", "true");
     ServerConfiguration conf = new ServerConfiguration(instance);
     FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
     Accumulo.init(fs, conf, "logger");
@@ -145,7 +145,8 @@ public class LogService implements Mutat
     AccumuloConfiguration acuConf = config.getConfiguration();
     FileSystemMonitor.start(acuConf, Property.LOGGER_MONITOR_FS);
     
-    fs = TraceFileSystem.wrap(fs);
+    // Log recovery requires a reference to the real file system
+    // fs = TraceFileSystem.wrap(fs);
     final Set<String> rootDirs = new HashSet<String>();
     for (String root : acuConf.get(Property.LOGGER_DIR).split(",")) {
       if (!root.startsWith("/"))

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java Mon May 21 16:03:56 2012
@@ -62,17 +62,15 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.accumulo.server.logger.metrics.LogWriterMetrics;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Metadata;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.WritableName;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.thrift.TException;
 
 
@@ -234,6 +232,81 @@ class LogWriter implements MutationLogge
     throw new FileNotFoundException("No file " + localLog + " found in " + roots);
   }
   
+  interface LogEntryReader {
+    
+    Pair<LogFileKey,LogFileValue> next() throws IOException;
+    
+    void close() throws IOException;
+
+    long getPosition() throws IOException;
+  }
+  
+  static abstract class BaseFileReader implements LogEntryReader {
+    
+    public Pair<LogFileKey,LogFileValue> next() throws IOException {
+      LogFileKey key = new LogFileKey();
+      LogFileValue value = new LogFileValue();
+      fetchOne(key, value);
+      return new Pair<LogFileKey,LogFileValue>(key, value);
+    }
+    
+    abstract void fetchOne(LogFileKey key, LogFileValue value) throws IOException;
+
+  }
+  
+  static class SequenceFileReader extends BaseFileReader {
+    
+    SequenceFile.Reader file;
+    
+    SequenceFileReader(SequenceFile.Reader file) {
+      this.file = file;
+    }
+    
+    @Override
+    public long getPosition() throws IOException {
+      return file.getPosition();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      file.close();
+    }
+    
+    @Override
+    void fetchOne(LogFileKey key, LogFileValue value) throws IOException {
+      if (file.next(key, value))
+        throw new EOFException("EOF");
+    }
+  }
+  
+  static class FileReader extends BaseFileReader {
+    
+    FSDataInputStream file;
+    
+    FileReader(FSDataInputStream file) {
+      this.file = file;
+    }
+    
+    public void close() throws IOException {
+      file.close();
+    }
+
+    @Override
+    public long getPosition() {
+      try {
+        return file.getPos();
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    
+    @Override
+    void fetchOne(LogFileKey key, LogFileValue value) throws IOException {
+      key.readFields(file);
+      value.readFields(file);
+    }
+  }
+
   @Override
   public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, final String localLog, final String fullyQualifiedFileName, final boolean sort) {
     log.info("Copying " + localLog + " to " + fullyQualifiedFileName);
@@ -246,14 +319,49 @@ class LogWriter implements MutationLogge
       log.error("Unexpected error thrown", e);
       throw new RuntimeException(e);
     }
+    LogEntryReader reader;
     File file;
+    long result;
     try {
-      file = new File(findLocalFilename(localLog));
-      log.info(file.getAbsoluteFile().toString());
-    } catch (FileNotFoundException ex) {
+      try {
+        file = new File(findLocalFilename(localLog));
+        log.info(file.getAbsoluteFile().toString());
+        result = file.length();
+        FileSystem local = FileSystem.getLocal(fs.getConf()).getRaw();
+        log.info("opened " + file + " of length " + result);
+        reader = new SequenceFileReader(new SequenceFile.Reader(local, new Path(file.getAbsolutePath()), local.getConf()));
+      } catch (FileNotFoundException ex) {
+        Path p = new Path(Constants.getWalDirectory(acuConf), localLog);
+        log.info("Looking for " + p);
+        if (!fs.exists(p))
+          throw new RuntimeException(ex);
+        while (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem dfs = (DistributedFileSystem) fs;
+          try {
+            dfs.recoverLease(p);
+            log.debug("recovered lease on " + p);
+            break;
+          } catch (IOException e) {
+            try {
+              log.debug("error recovering lease on " + p, e);
+              dfs.append(p).close();
+              log.debug("lease recovered using append on " + p);
+              break;
+            } catch (IOException ie) {
+              UtilWaitThread.sleep(1000);
+              log.debug("retrying lease recovery on " + p);
+              continue;
+            }
+          }
+        }
+        reader = new FileReader(fs.open(p));
+        log.info("opened " + p + " of length " + fs.getFileStatus(p).getLen());
+        result = fs.getFileStatus(p).getLen();
+      }
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
-    long result = file.length();
+    final LogEntryReader finalReader = reader;
     
     copyThreadPool.execute(new Runnable() {
       @Override
@@ -261,11 +369,7 @@ class LogWriter implements MutationLogge
         Thread.currentThread().setName("Copying " + localLog + " to shared file system");
         for (int i = 0; i < 3; i++) {
           try {
-            if (sort) {
-              copySortLog(localLog, fullyQualifiedFileName);
-            } else {
-              copyLog(localLog, fullyQualifiedFileName);
-            }
+            copySortLog(finalReader, fullyQualifiedFileName);
             return;
           } catch (IOException e) {
             log.error("error during copy", e);
@@ -283,41 +387,34 @@ class LogWriter implements MutationLogge
           metrics.add(LogWriterMetrics.copy, (t2 - t1));
       }
       
-      private void copySortLog(String localLog, String fullyQualifiedFileName) throws IOException {
+      private void copySortLog(LogEntryReader reader, String fullyQualifiedFileName) throws IOException {
         final long SORT_BUFFER_SIZE = acuConf.getMemoryInBytes(Property.LOGGER_SORT_BUFFER_SIZE);
         
-        FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(fs.getConf()).getRaw());
         Path dest = new Path(fullyQualifiedFileName + ".recovered");
         log.debug("Sorting log file to DSF " + dest);
         fs.mkdirs(dest);
         int part = 0;
         
-        Reader reader = new SequenceFile.Reader(local, new Path(findLocalFilename(localLog)), fs.getConf());
         try {
           final ArrayList<Pair<LogFileKey,LogFileValue>> kv = new ArrayList<Pair<LogFileKey,LogFileValue>>();
-          long memorySize = 0;
+          long start = reader.getPosition();
           while (true) {
-            final long position = reader.getPosition();
-            final LogFileKey key = new LogFileKey();
-            final LogFileValue value = new LogFileValue();
             try {
-              if (!reader.next(key, value))
-                break;
+              kv.add(reader.next());
             } catch (EOFException e) {
-              log.warn("Unexpected end of file reading write ahead log " + localLog);
               break;
             }
-            kv.add(new Pair<LogFileKey,LogFileValue>(key, value));
-            memorySize += reader.getPosition() - position;
+            long memorySize = reader.getPosition() - start;
             if (memorySize > SORT_BUFFER_SIZE) {
               writeSortedEntries(dest, part++, kv);
               kv.clear();
-              memorySize = 0;
+              start = reader.getPosition();
             }
           }
 
           if (!kv.isEmpty())
             writeSortedEntries(dest, part++, kv);
+          log.debug("Input file for " + dest + " was " + reader.getPosition() + " bytes long");
           fs.create(new Path(dest, "finished")).close();
         } finally {
           reader.close();
@@ -352,35 +449,6 @@ class LogWriter implements MutationLogge
         }
       }
       
-      private void copyLog(final String localLog, final String fullyQualifiedFileName) throws IOException {
-        Path dest = new Path(fullyQualifiedFileName + ".copy");
-        log.debug("Copying log file to DSF " + dest);
-        fs.delete(dest, true);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-        Writer writer = null;
-        Reader reader = null;
-        try {
-          short replication = (short) acuConf.getCount(Property.LOGGER_RECOVERY_FILE_REPLICATION);
-          writer = SequenceFile.createWriter(fs, fs.getConf(), dest, LogFileKey.class, LogFileValue.class, fs.getConf().getInt("io.file.buffer.size", 4096),
-              replication, fs.getDefaultBlockSize(), SequenceFile.CompressionType.BLOCK, new DefaultCodec(), null, new Metadata());
-          FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(fs.getConf()).getRaw());
-          reader = new SequenceFile.Reader(local, new Path(findLocalFilename(localLog)), fs.getConf());
-          while (reader.next(key, value)) {
-            writer.append(key, value);
-          }
-        } catch (IOException ex) {
-          log.warn("May have a partial copy of a recovery file: " + localLog, ex);
-        } finally {
-          if (reader != null)
-            reader.close();
-          if (writer != null)
-            writer.close();
-        }
-        // Make file appear in the shared file system as the target name only after it is completely copied
-        fs.rename(dest, new Path(fullyQualifiedFileName));
-        log.info("Copying " + localLog + " complete");
-      }
     });
     return new LogCopyInfo(result, null);
   }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java Mon May 21 16:03:56 2012
@@ -28,16 +28,13 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.IRemoteLogger;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -122,11 +119,15 @@ public class CoordinateRecoveryTask impl
       config = conf;
     }
     
-    private void startCopy() throws Exception {
+    private void startCopy(DfsLogger.ServerConfig conf) throws Exception {
       log.debug("Starting log recovery: " + logFile);
       try {
         // Ask the logging server to put the file in HDFS
-        RemoteLogger logger = new RemoteLogger(logFile.server, config);
+        IRemoteLogger logger;
+        if (logFile.server.length() > 0)
+          logger = new RemoteLogger(logFile.server, config);
+        else
+          logger = new DfsLogger(conf);
         String base = logFile.unsortedFileName();
         log.debug("Starting to copy " + logFile.file + " from " + logFile.server);
         LogCopyInfo lci = logger.startCopy(logFile.file, base);
@@ -147,7 +148,7 @@ public class CoordinateRecoveryTask impl
         return true;
       }
       
-      if (zcache.get(loggerZNode) == null) {
+      if (loggerZNode != null && loggerZNode.length() > 0 && zcache.get(loggerZNode) == null) {
         log.debug("zknode " + loggerZNode + " is gone, copy " + logFile.file + " from " + logFile.server + " assumed dead");
         return true;
       }
@@ -215,7 +216,7 @@ public class CoordinateRecoveryTask impl
     zcache = new ZooCache();
   }
   
-  public boolean recover(AuthInfo credentials, KeyExtent extent, Collection<Collection<String>> entries, JobComplete notify) {
+  public boolean recover(DfsLogger.ServerConfig server, KeyExtent extent, Collection<Collection<String>> entries, JobComplete notify) {
     boolean finished = true;
     log.debug("Log entries: " + entries);
     for (Collection<String> set : entries) {
@@ -250,7 +251,7 @@ public class CoordinateRecoveryTask impl
             }
           }
           if (job != null) {
-            job.startCopy();
+            job.startCopy(server);
           }
         } catch (Exception ex) {
           log.warn("exception starting recovery " + ex);

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java Mon May 21 16:03:56 2012
@@ -145,6 +145,8 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.ZKAuthenticator;
 import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger.ServerConfig;
+import org.apache.accumulo.server.tabletserver.log.IRemoteLogger;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
@@ -1370,7 +1372,7 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (!recovery.recover(SecurityConstants.getSystemCredentials(), tls.extent, tls.walogs, Master.this)) {
+                if (!recovery.recover(getServerConfig(), tls.extent, tls.walogs, Master.this)) {
                   continue;
                 }
               }
@@ -1460,6 +1462,28 @@ public class Master implements LiveTServ
       }
     }
     
+    /**
+     * @return
+     */
+    private ServerConfig getServerConfig() {
+      return new ServerConfig() {
+        @Override
+        public AccumuloConfiguration getConfiguration() {
+          return getSystemConfiguration();
+        }
+
+        @Override
+        public FileSystem getFileSystem() {
+          return fs;
+        }
+
+        @Override
+        public List<String> getCurrentLoggers() {
+          return new ArrayList<String>(Master.this.getLoggers().values());
+        }
+      };
+    }
+
     private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) {
       // Already split?
       if (!info.getState().equals(MergeState.SPLITTING))
@@ -2198,7 +2222,7 @@ public class Master implements LiveTServ
   @Override
   public void newLogger(String address) {
     try {
-      RemoteLogger remote = new RemoteLogger(address, getSystemConfiguration());
+      IRemoteLogger remote = new RemoteLogger(address, getSystemConfiguration());
       for (String onDisk : remote.getClosedLogs()) {
         Path path = new Path(ServerConstants.getRecoveryDir(), onDisk + ".failed");
         if (fs.exists(path)) {

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon May 21 16:03:56 2012
@@ -110,6 +110,7 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.IRemoteLogger;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
@@ -206,7 +207,7 @@ public class Tablet {
       return Tablet.this;
     }
     
-    public boolean beginUpdatingLogsUsed(ArrayList<RemoteLogger> copy, boolean mincFinish) {
+    public boolean beginUpdatingLogsUsed(ArrayList<IRemoteLogger> copy, boolean mincFinish) {
       return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
     }
     
@@ -1223,12 +1224,12 @@ public class Tablet {
     return datafiles;
   }
   
-  private static Set<RemoteLogger> getCurrentLoggers(List<LogEntry> entries) {
-    Set<RemoteLogger> result = new HashSet<RemoteLogger>();
+  private static Set<IRemoteLogger> getCurrentLoggers(List<LogEntry> entries) {
+    Set<IRemoteLogger> result = new HashSet<IRemoteLogger>();
     for (LogEntry logEntry : entries) {
       for (String log : logEntry.logSet) {
         String[] parts = log.split("/", 2);
-        result.add(new RemoteLogger(parts[0], parts[1], null));
+        result.add(new RemoteLogger(parts[0], parts[1]));
       }
     }
     return result;
@@ -2232,7 +2233,7 @@ public class Tablet {
   private synchronized MinorCompactionTask prepareForMinC(long flushId) {
     CommitSession oldCommitSession = tabletMemory.prepareForMinC();
     otherLogs = currentLogs;
-    currentLogs = new HashSet<RemoteLogger>();
+    currentLogs = new HashSet<IRemoteLogger>();
     
     String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
     
@@ -3616,7 +3617,7 @@ public class Tablet {
     }
   }
   
-  private Set<RemoteLogger> currentLogs = new HashSet<RemoteLogger>();
+  private Set<IRemoteLogger> currentLogs = new HashSet<IRemoteLogger>();
   
   private Set<String> beginClearingUnusedLogs() {
     Set<String> doomed = new HashSet<String>();
@@ -3631,12 +3632,12 @@ public class Tablet {
       if (removingLogs)
         throw new IllegalStateException("Attempted to clear logs when removal of logs in progress");
       
-      for (RemoteLogger logger : otherLogs) {
+      for (IRemoteLogger logger : otherLogs) {
         otherLogsCopy.add(logger.toString());
         doomed.add(logger.toString());
       }
       
-      for (RemoteLogger logger : currentLogs) {
+      for (IRemoteLogger logger : currentLogs) {
         currentLogsCopy.add(logger.toString());
         doomed.remove(logger.toString());
       }
@@ -3664,7 +3665,7 @@ public class Tablet {
     logLock.unlock();
   }
   
-  private Set<RemoteLogger> otherLogs = Collections.emptySet();
+  private Set<IRemoteLogger> otherLogs = Collections.emptySet();
   private boolean removingLogs = false;
   
   // this lock is basically used to synchronize writing of log info to !METADATA
@@ -3674,7 +3675,7 @@ public class Tablet {
     return currentLogs.size();
   }
   
-  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<RemoteLogger> more, boolean mincFinish) {
+  private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<IRemoteLogger> more, boolean mincFinish) {
     
     boolean releaseLock = true;
     
@@ -3711,7 +3712,7 @@ public class Tablet {
         
         int numAdded = 0;
         int numContained = 0;
-        for (RemoteLogger logger : more) {
+        for (IRemoteLogger logger : more) {
           if (addToOther) {
             if (otherLogs.add(logger))
               numAdded++;

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon May 21 16:03:56 2012
@@ -159,9 +159,10 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.IRemoteLogger;
 import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
 import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
 import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
@@ -897,7 +898,7 @@ public class TabletServer extends Abstra
         
         ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
         String oldThreadName = Thread.currentThread().getName();
-
+        
         try {
           runState.set(ScanRunState.RUNNING);
           Thread.currentThread().setName(
@@ -962,7 +963,7 @@ public class TabletServer extends Abstra
           Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
           if (isCancelled() || session == null)
             return;
-
+          
           long maxResultsSize = acuConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
           long bytesAdded = 0;
           long maxScanTime = 4000;
@@ -2543,25 +2544,21 @@ public class TabletServer extends Abstra
     return result;
   }
   
-  public void addLoggersToMetadata(List<RemoteLogger> logs, KeyExtent extent, int id) {
+  public void addLoggersToMetadata(List<IRemoteLogger> logs, KeyExtent extent, int id) {
     log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
     
-    List<MetadataTable.LogEntry> entries = new ArrayList<MetadataTable.LogEntry>();
     long now = RelativeTime.currentTimeMillis();
     List<String> logSet = new ArrayList<String>();
-    for (RemoteLogger log : logs)
+    for (IRemoteLogger log : logs)
       logSet.add(log.toString());
-    for (RemoteLogger log : logs) {
-      MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
-      entry.extent = extent;
-      entry.tabletId = id;
-      entry.timestamp = now;
-      entry.server = log.getLogger();
-      entry.filename = log.getFileName();
-      entry.logSet = logSet;
-      entries.add(entry);
-    }
-    MetadataTable.addLogEntries(SecurityConstants.getSystemCredentials(), entries, getLock());
+    MetadataTable.LogEntry entry = new MetadataTable.LogEntry();
+    entry.extent = extent;
+    entry.tabletId = id;
+    entry.timestamp = now;
+    entry.server = logs.get(0).getLogger();
+    entry.filename = logs.get(0).getFileName();
+    entry.logSet = logSet;
+    MetadataTable.addLogEntry(SecurityConstants.getSystemCredentials(), entry, getLock());
   }
   
   private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
@@ -3332,12 +3329,33 @@ public class TabletServer extends Abstra
     return METRICS_PREFIX;
   }
   
-  // public AccumuloConfiguration getTableConfiguration(String tableId) {
-  // return ServerConfiguration.getTableConfiguration(instance, tableId);
-  // }
-
   public TableConfiguration getTableConfiguration(KeyExtent extent) {
     return ServerConfiguration.getTableConfiguration(instance, extent.getTableId().toString());
   }
 
+  public DfsLogger.ServerConfig getServerConfig() {
+    return new DfsLogger.ServerConfig() {
+      
+      @Override
+      public FileSystem getFileSystem() {
+        return fs;
+      }
+      
+      @Override
+      public List<String> getCurrentLoggers() {
+        try {
+          return new ArrayList<String>(getLoggers());
+        } catch (Exception ex) {
+          log.warn(ex, ex);
+          return Collections.emptyList();
+        }
+      }
+      
+      @Override
+      public AccumuloConfiguration getConfiguration() {
+        return getSystemConfiguration();
+      }
+    };
+  }
+
 }

Added: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1341087&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (added)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon May 21 16:03:56 2012
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.server.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
+import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
+import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LogWork;
+import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+/**
+ * Wrap a connection to a logger.
+ * 
+ */
+public class DfsLogger implements IRemoteLogger {
+  private static Logger log = Logger.getLogger(DfsLogger.class);
+  
+  public interface ServerConfig {
+    AccumuloConfiguration getConfiguration();
+    
+    FileSystem getFileSystem();
+    
+    List<String> getCurrentLoggers();
+  }
+
+  private LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<LogWork>();
+  
+  private String closeLock = new String("foo");
+  
+  private static final LogWork CLOSED_MARKER = new LogWork(null, null);
+  
+  private static final LogFileValue EMPTY = new LogFileValue();
+  
+  private boolean closed = false;
+
+  private class LogWriterTask implements Runnable {
+
+    @Override
+    public void run() {
+      ArrayList<LogWork> work = new ArrayList<LogWork>();
+      ArrayList<TabletMutations> mutations = new ArrayList<TabletMutations>();
+      while (true) {
+        try {
+          
+          work.clear();
+          mutations.clear();
+          
+          work.add(workQueue.take());
+          workQueue.drainTo(work);
+          
+          for (LogWork logWork : work)
+            if (logWork != CLOSED_MARKER)
+              mutations.addAll(logWork.mutations);
+          
+          synchronized (DfsLogger.this) {
+            try {
+              for (TabletMutations mutation : mutations) {
+                LogFileKey key = new LogFileKey();
+                key.event = MANY_MUTATIONS;
+                key.seq = mutation.seq;
+                key.tid = mutation.tabletID;
+                LogFileValue value = new LogFileValue();
+                Mutation[] m = new Mutation[mutation.mutations.size()];
+                for (int i = 0; i < m.length; i++)
+                  m[i] = new Mutation(mutation.mutations.get(i));
+                value.mutations = m;
+                write(key, value);
+              }
+            } catch (Exception e) {
+              log.error(e, e);
+              for (LogWork logWork : work)
+                if (logWork != CLOSED_MARKER)
+                  logWork.exception = e;
+            }
+          }
+          synchronized (closeLock) {
+            if (!closed) {
+              logFile.flush();
+              logFile.sync();
+            }
+          }
+          
+          boolean sawClosedMarker = false;
+          for (LogWork logWork : work)
+            if (logWork == CLOSED_MARKER)
+              sawClosedMarker = true;
+            else
+              logWork.latch.countDown();
+          
+          if (sawClosedMarker) {
+            synchronized (closeLock) {
+              closeLock.notifyAll();
+            }
+            break;
+          }
+
+        } catch (Exception e) {
+          log.error(e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
+   */
+  @Override
+  public boolean equals(Object obj) {
+    // filename is unique
+    if (obj == null)
+      return false;
+    if (obj instanceof IRemoteLogger)
+      return getFileName().equals(((IRemoteLogger) obj).getFileName());
+    return false;
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
+   */
+  @Override
+  public int hashCode() {
+    // filename is unique
+    return getFileName().hashCode();
+  }
+  
+  private ServerConfig conf;
+  private FSDataOutputStream logFile;
+  private Path logPath;
+  
+  public DfsLogger(ServerConfig conf) throws IOException {
+    this.conf = conf;
+  }
+  
+  public DfsLogger(ServerConfig conf, String filename) throws IOException {
+    this.conf = conf;
+    this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+  }
+
+  public synchronized void open() throws IOException {
+    String filename = UUID.randomUUID().toString();
+    logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+    try {
+      short replication = (short) conf.getConfiguration().getCount(Property.LOGGER_RECOVERY_FILE_REPLICATION);
+      if (replication == 0)
+        replication = (short) conf.getFileSystem().getDefaultReplication();
+      logFile = conf.getFileSystem().create(logPath, replication);
+      LogFileKey key = new LogFileKey();
+      key.event = OPEN;
+      key.tserverSession = filename;
+      key.filename = filename;
+      write(key, EMPTY);
+      log.debug("Got new write-ahead log: " + this);
+    } catch (IOException ex) {
+      if (logFile != null)
+        logFile.close();
+      logFile = null;
+      throw ex;
+    }
+    
+    Thread t = new Daemon(new LogWriterTask());
+    t.setName("Accumulo WALog thread " + toString());
+    t.start();
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
+   */
+  @Override
+  public String toString() {
+    return getLogger() + "/" + getFileName();
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getLogger()
+   */
+  @Override
+  public String getLogger() {
+    return "";
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getFileName()
+   */
+  @Override
+  public String getFileName() {
+    return logPath.getName();
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#close()
+   */
+  @Override
+  public void close() throws NoSuchLogIDException, LoggerClosedException, TException {
+    
+    synchronized (closeLock) {
+      if (closed)
+        return;
+      // after closed is set to true, nothing else should be added to the queue
+      // CLOSED_MARKER should be the last thing on the queue, therefore when the
+      // background thread sees the marker and exits there should be nothing else
+      // to process... so nothing should be left waiting for the background
+      // thread to do work
+      closed = true;
+      workQueue.add(CLOSED_MARKER);
+      while (!workQueue.isEmpty())
+        try {
+          closeLock.wait();
+        } catch (InterruptedException e) {
+          log.info("Interrupted");
+        }
+    }
+
+    if (logFile != null)
+      try {
+        logFile.close();
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#defineTablet(int, int, org.apache.accumulo.core.data.KeyExtent)
+   */
+  @Override
+  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws NoSuchLogIDException, LoggerClosedException, TException {
+    // write this log to the METADATA table
+    final LogFileKey key = new LogFileKey();
+    key.event = DEFINE_TABLET;
+    key.seq = seq;
+    key.tid = tid;
+    key.tablet = tablet;
+    write(key, EMPTY);
+    try {
+      logFile.flush();
+      logFile.sync();
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  /**
+   * @param key
+   * @param empty2
+   * @throws IOException
+   */
+  private synchronized void write(LogFileKey key, LogFileValue value) {
+    try {
+      key.write(logFile);
+      value.write(logFile);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#log(int, int, org.apache.accumulo.core.data.Mutation)
+   */
+  @Override
+  public LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
+    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#logManyTablets(java.util.List)
+   */
+  @Override
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
+    LogWork work = new LogWork(mutations, new CountDownLatch(1));
+    
+    synchronized (closeLock) {
+      // use a different lock for close check so that adding to work queue does not need
+      // to wait on walog I/O operations
+
+      if (closed)
+        throw new NoSuchLogIDException();
+      workQueue.add(work);
+    }
+
+    return new LoggerOperation(work);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#minorCompactionFinished(int, int, java.lang.String)
+   */
+  @Override
+  public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_FINISH;
+    key.seq = seq;
+    key.tid = tid;
+    write(key, EMPTY);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#minorCompactionStarted(int, int, java.lang.String)
+   */
+  @Override
+  public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_START;
+    key.seq = seq;
+    key.tid = tid;
+    key.filename = fqfn;
+    write(key, EMPTY);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#startCopy(java.lang.String, java.lang.String)
+   */
+  @Override
+  public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName) throws ThriftSecurityException, TException {
+    MutationLogger.Iface client = null;
+    try {
+      List<String> currentLoggers = conf.getCurrentLoggers();
+      if (currentLoggers.isEmpty())
+        throw new RuntimeException("No loggers for recovery");
+      Random random = new Random();
+      int choice = random.nextInt(currentLoggers.size());
+      String address = currentLoggers.get(choice);
+      client = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf.getConfiguration());
+      return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName, true);
+    } finally {
+      if (client != null)
+        ThriftUtil.returnClient(client);
+    }
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getClosedLogs()
+   */
+  @Override
+  public synchronized List<String> getClosedLogs() throws ThriftSecurityException, TException {
+    return Collections.emptyList();
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#removeFile(java.util.List)
+   */
+  @Override
+  public synchronized void removeFile(List<String> files) throws ThriftSecurityException, TException {
+  }
+  
+}

Propchange: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java?rev=1341087&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java (added)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java Mon May 21 16:03:56 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
+import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
+import org.apache.thrift.TException;
+
+/**
+ * 
+ */
+public interface IRemoteLogger {
+  
+  public abstract boolean equals(Object obj);
+  
+  public abstract int hashCode();
+  
+  public abstract String toString();
+  
+  public abstract String getLogger();
+  
+  public abstract String getFileName();
+  
+  public abstract void close() throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract void defineTablet(int seq, int tid, KeyExtent tablet) throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract void minorCompactionStarted(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException;
+  
+  public abstract LogCopyInfo startCopy(String name, String fullyQualifiedFileName) throws ThriftSecurityException, TException;
+  
+  public abstract List<String> getClosedLogs() throws ThriftSecurityException, TException;
+  
+  public abstract void removeFile(List<String> files) throws ThriftSecurityException, TException;
+  
+  public abstract void open() throws IOException;
+
+}
\ No newline at end of file

Propchange: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/IRemoteLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Mon May 21 16:03:56 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -40,13 +39,12 @@ import org.apache.accumulo.core.util.Thr
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 
 /**
  * Wrap a connection to a logger.
  * 
  */
-public class RemoteLogger {
+public class RemoteLogger implements IRemoteLogger {
   private static Logger log = Logger.getLogger(RemoteLogger.class);
   
   private LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<LogWork>();
@@ -86,7 +84,7 @@ public class RemoteLogger {
     }
   }
 
-  private static class LogWork {
+  static class LogWork {
     List<TabletMutations> mutations;
     CountDownLatch latch;
     volatile Exception exception;
@@ -142,16 +140,22 @@ public class RemoteLogger {
     }
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
+   */
   @Override
   public boolean equals(Object obj) {
     // filename is unique
     if (obj == null)
       return false;
-    if (obj instanceof RemoteLogger)
-      return getFileName().equals(((RemoteLogger) obj).getFileName());
+    if (obj instanceof IRemoteLogger)
+      return getFileName().equals(((IRemoteLogger) obj).getFileName());
     return false;
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
+   */
   @Override
   public int hashCode() {
     // filename is unique
@@ -159,70 +163,69 @@ public class RemoteLogger {
   }
   
   private final String logger;
-  private final LogFile logFile;
-  private final UUID tserverSession;
+  private LogFile logFile = null;
   private MutationLogger.Iface client = null;
   
-  public RemoteLogger(String address, UUID tserverUUID, AccumuloConfiguration conf) throws ThriftSecurityException, LoggerClosedException, TException,
+  public RemoteLogger(String address, AccumuloConfiguration conf) throws ThriftSecurityException, LoggerClosedException, TException,
       IOException {
     
     logger = address;
-    tserverSession = tserverUUID;
     try {
       client = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-      logFile = client.create(null, SecurityConstants.getSystemCredentials(), tserverSession.toString());
-      log.debug("Got new write-ahead log: " + this);
-    } catch (ThriftSecurityException tse) {
-      ThriftUtil.returnClient(client);
-      client = null;
-      throw tse;
-    } catch (LoggerClosedException lce) {
-      ThriftUtil.returnClient(client);
-      client = null;
-      throw lce;
     } catch (TException te) {
       ThriftUtil.returnClient(client);
       client = null;
       throw te;
     }
-    
-    Thread t = new Daemon(new LogWriterTask());
-    t.setName("Accumulo WALog thread " + toString());
-    t.start();
   }
   
-  public RemoteLogger(String address, AccumuloConfiguration conf) throws IOException {
-    logger = address;
-    tserverSession = null;
-    logFile = null;
+  public void open() throws IOException {
     try {
-      client = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-    } catch (TTransportException e) {
+      logFile = client.create(null, SecurityConstants.getSystemCredentials(), "");
+    } catch (Exception e) {
       throw new IOException(e);
     }
+    log.debug("Got new write-ahead log: " + this);
+    Thread t = new Daemon(new LogWriterTask());
+    t.setName("Accumulo WALog thread " + toString());
+    t.start();
   }
-  
+
   // Fake placeholder for logs used during recovery
-  public RemoteLogger(String logger, String filename, UUID tserverUUID) {
+  public RemoteLogger(String logger, String filename) {
     this.client = null;
     this.logger = logger;
     this.logFile = new LogFile(filename, -1);
-    this.tserverSession = null;
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
+   */
   @Override
   public String toString() {
     return getLogger() + "/" + getFileName();
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getLogger()
+   */
+  @Override
   public String getLogger() {
     return logger;
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getFileName()
+   */
+  @Override
   public String getFileName() {
     return logFile.name;
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#close()
+   */
+  @Override
   public synchronized void close() throws NoSuchLogIDException, LoggerClosedException, TException {
     
     synchronized (closeLock) {
@@ -247,14 +250,26 @@ public class RemoteLogger {
     }
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#defineTablet(int, int, org.apache.accumulo.core.data.KeyExtent)
+   */
+  @Override
   public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws NoSuchLogIDException, LoggerClosedException, TException {
     client.defineTablet(null, logFile.id, seq, tid, tablet.toThrift());
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#log(int, int, org.apache.accumulo.core.data.Mutation)
+   */
+  @Override
   public LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
     return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#logManyTablets(java.util.List)
+   */
+  @Override
   public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
     LogWork work = new LogWork(mutations, new CountDownLatch(1));
     
@@ -270,22 +285,42 @@ public class RemoteLogger {
     return new LoggerOperation(work);
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#minorCompactionFinished(int, int, java.lang.String)
+   */
+  @Override
   public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {
     client.minorCompactionFinished(null, logFile.id, seq, tid, fqfn);
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#minorCompactionStarted(int, int, java.lang.String)
+   */
+  @Override
   public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {
     client.minorCompactionStarted(null, logFile.id, seq, tid, fqfn);
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#startCopy(java.lang.String, java.lang.String)
+   */
+  @Override
   public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName) throws ThriftSecurityException, TException {
     return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName, true);
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#getClosedLogs()
+   */
+  @Override
   public synchronized List<String> getClosedLogs() throws ThriftSecurityException, TException {
     return client.getClosedLogs(null, SecurityConstants.getSystemCredentials());
   }
   
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#removeFile(java.util.List)
+   */
+  @Override
   public synchronized void removeFile(List<String> files) throws ThriftSecurityException, TException {
     client.remove(null, SecurityConstants.getSystemCredentials(), files);
   }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon May 21 16:03:56 2012
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -61,7 +60,7 @@ public class TabletServerLogger {
   private final TabletServer tserver;
   
   // The current log set: always updated to a new set with every change of loggers
-  private final List<RemoteLogger> loggers = new ArrayList<RemoteLogger>();
+  private final List<IRemoteLogger> loggers = new ArrayList<IRemoteLogger>();
   
   // The current generation of logSet.
   // Because multiple threads can be using a log set at one time, a log
@@ -134,7 +133,7 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
   }
   
-  private int initializeLoggers(final List<RemoteLogger> copy) throws IOException {
+  private int initializeLoggers(final List<IRemoteLogger> copy) throws IOException {
     final int[] result = {-1};
     testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       boolean test() {
@@ -165,7 +164,7 @@ public class TabletServerLogger {
   public void getLoggers(Set<String> loggersOut) {
     logSetLock.readLock().lock();
     try {
-      for (RemoteLogger logger : loggers) {
+      for (IRemoteLogger logger : loggers) {
         loggersOut.add(logger.getLogger());
       }
     } finally {
@@ -183,35 +182,39 @@ public class TabletServerLogger {
     }
     
     try {
-      while (true) {
-        Set<String> loggerAddresses = tserver.getLoggers();
-        if (!loggerAddresses.isEmpty()) {
-          for (String logger : loggerAddresses) {
-            try {
-              loggers.add(new RemoteLogger(logger, UUID.randomUUID(), tserver.getSystemConfiguration()));
-            } catch (LoggerClosedException t) {
-              close();
-              break;
-            } catch (Exception t) {
-              close();
-              log.warn("Unable to connect to " + logger + ": " + t);
-              break;
+      if (tserver.getSystemConfiguration().getBoolean(Property.TSERV_USE_DFS_WAL)) {
+        DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+        alog.open();
+        loggers.add(alog);
+      } else {
+        while (true) {
+          Set<String> loggerAddresses = tserver.getLoggers();
+          if (!loggerAddresses.isEmpty()) {
+            for (String logger : loggerAddresses) {
+              try {
+                RemoteLogger alog = new RemoteLogger(logger, tserver.getSystemConfiguration());
+                alog.open();
+                loggers.add(alog);
+              } catch (LoggerClosedException t) {
+                close();
+                break;
+              } catch (Exception t) {
+                close();
+                log.warn("Unable to connect to " + logger + ": " + t);
+                break;
+              }
             }
-          }
-          
-          if (loggers.size() == loggerAddresses.size())
-            break;
-          if (loggers.size() > 0) {
-            // something is screwy, loggers.size() should be 0 or loggerAddresses.size()..
-            throw new RuntimeException("Unexpected number of loggers " + loggers.size() + " " + loggerAddresses.size());
+            
+            if (loggers.size() == loggerAddresses.size())
+              break;
+            UtilWaitThread.sleep(1000);
           }
         }
-        UtilWaitThread.sleep(1000);
       }
       logSetId.incrementAndGet();
       return;
     } catch (Exception t) {
-      throw new IOException(t);
+      throw new RuntimeException(t);
     }
   }
   
@@ -229,7 +232,7 @@ public class TabletServerLogger {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      for (RemoteLogger logger : loggers) {
+      for (IRemoteLogger logger : loggers) {
         try {
           logger.close();
         } catch (LoggerClosedException ex) {
@@ -246,7 +249,7 @@ public class TabletServerLogger {
   }
   
   interface Writer {
-    LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
+    LoggerOperation write(IRemoteLogger logger, int seq) throws Exception;
   }
   
   private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -265,7 +268,7 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        ArrayList<RemoteLogger> copy = new ArrayList<RemoteLogger>();
+        ArrayList<IRemoteLogger> copy = new ArrayList<IRemoteLogger>();
         currentLogSet = initializeLoggers(copy);
         
         // add the logger to the log set for the memory in the tablet,
@@ -294,7 +297,7 @@ public class TabletServerLogger {
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
           ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
-          for (RemoteLogger wal : copy) {
+          for (IRemoteLogger wal : copy) {
             LoggerOperation lop = writer.write(wal, seq);
             if (lop != null)
               queuedOperations.add(lop);
@@ -309,7 +312,7 @@ public class TabletServerLogger {
         }
       } catch (Exception t) {
         if (attempt == 0) {
-          log.info("Log write failed: another thread probably closed the log");
+          log.info("Log write failed: another thread probably closed the log", t);
         } else {
           log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
           UtilWaitThread.sleep(100);
@@ -356,7 +359,7 @@ public class TabletServerLogger {
       return -1;
     return write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
         return null;
       }
@@ -368,7 +371,7 @@ public class TabletServerLogger {
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception {
         return logger.log(tabletSeq, commitSession.getLogId(), m);
       }
     });
@@ -388,7 +391,7 @@ public class TabletServerLogger {
     
     int seq = write(loggables.keySet(), false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
@@ -419,7 +422,7 @@ public class TabletServerLogger {
     
     int seq = write(commitSession, true, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception {
         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }
@@ -435,7 +438,7 @@ public class TabletServerLogger {
       return -1;
     write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception {
         logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
         return null;
       }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Mon May 21 16:03:56 2012
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -67,7 +66,6 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.StringUtil;
-import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -769,11 +767,7 @@ public class MetadataTable extends org.a
     return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZROOT_TABLET_WALOGS;
   }
   
-  public static void addLogEntries(AuthInfo credentials, List<LogEntry> entries, ZooLock zooLock) {
-    if (entries.size() == 0)
-      return;
-    // entries should be a complete log set, so we should only need to write the first entry
-    LogEntry entry = entries.get(0);
+  public static void addLogEntry(AuthInfo credentials, LogEntry entry, ZooLock zooLock) {
     if (entry.extent.isRootTablet()) {
       String root = getZookeeperLogLocation();
       while (true) {

Modified: accumulo/branches/ACCUMULO-578/server/src/test/java/org/apache/accumulo/server/logger/TestLogWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/test/java/org/apache/accumulo/server/logger/TestLogWriter.java?rev=1341087&r1=1341086&r2=1341087&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/test/java/org/apache/accumulo/server/logger/TestLogWriter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/test/java/org/apache/accumulo/server/logger/TestLogWriter.java Mon May 21 16:03:56 2012
@@ -43,10 +43,6 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.ZooConfiguration;
-import org.apache.accumulo.server.logger.LogEvents;
-import org.apache.accumulo.server.logger.LogFileKey;
-import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.logger.LogWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -113,7 +109,7 @@ public class TestLogWriter {
       if (fs.exists(mylog))
         break;
     }
-    assertTrue(fs.exists(mylog));
+    assertTrue(fs.exists(new Path(mylog + ".recovered")));
     Mutation m = new Mutation(new Text("row1"));
     m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
     try {

Propchange: accumulo/branches/ACCUMULO-578/test/system/continuous/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 21 16:03:56 2012
@@ -1,5 +1,4 @@
 continuous-env.sh
-
 walkers.txt
-
 ingesters.txt
+report_*

Propchange: accumulo/branches/ACCUMULO-578/test/system/continuous/logs/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon May 21 16:03:56 2012
@@ -0,0 +1 @@
+*



Mime
View raw message