accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1342229 [2/2] - in /accumulo/branches/ACCUMULO-578: ./ bin/ core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/sr...
Date Thu, 24 May 2012 12:35:35 GMT
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1342229&r1=1342228&r2=1342229&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Thu May 24 12:35:34 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.table
 
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.GarbageCollectorMXBean;
@@ -45,6 +46,7 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
@@ -136,6 +138,8 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -161,6 +165,7 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.IRemoteLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
 import org.apache.accumulo.server.tabletserver.log.LoggerStrategy;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
 import org.apache.accumulo.server.tabletserver.log.RoundRobinLoggerStrategy;
@@ -191,8 +196,12 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.Platform;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -222,12 +231,14 @@ public class TabletServer extends Abstra
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
   
   private ServerConfiguration serverConfig;
+  private final LogSorter logSorter;
   
   public TabletServer(ServerConfiguration conf, FileSystem fs) {
     super();
     this.serverConfig = conf;
     this.instance = conf.getInstance();
     this.fs = TraceFileSystem.wrap(fs);
+    this.logSorter = new LogSorter(fs, conf.getConfiguration());
     SimpleTimer.getInstance().schedule(new TimerTask() {
       @Override
       public void run() {
@@ -2042,6 +2053,27 @@ public class TabletServer extends Abstra
       
     }
     
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#sortLog(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.lang.String)
+     */
+    @Override
+    public double sortLog(TInfo tinfo, AuthInfo credentials, String source, String dest)
throws ThriftSecurityException, TException {
+      try {
+        if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
+          throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      try {
+        return logSorter.sort(source, dest);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    
   }
   
   private class SplitRunner implements Runnable {
@@ -3105,6 +3137,7 @@ public class TabletServer extends Abstra
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
+      recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");
@@ -3113,7 +3146,45 @@ public class TabletServer extends Abstra
       log.error("Uncaught exception in TabletServer.main, exiting", ex);
     }
   }
-  
+
+  /**
+   * Copy local walogs into HDFS on an upgrade
+   * 
+   */
+  public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf)
throws IOException {
+    FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
+    AccumuloConfiguration conf = serverConf.getConfiguration();
+    String localWalDirectory = conf.get(Property.LOGGER_DIR);
+    for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
+      String name = file.getPath().getName();
+      try {
+        UUID.fromString(name);
+      } catch (IllegalArgumentException ex) {
+        log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
+        continue;
+      }
+      LogFileKey key = new LogFileKey();
+      LogFileValue value = new LogFileValue();
+      log.info("Openning local log " + file.getPath());
+      Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
+      Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
+      FSDataOutputStream writer = fs.create(tmp);
+      while (reader.next(key, value)) {
+        try {
+          key.write(writer);
+          value.write(writer);
+        } catch (EOFException ex) {
+          break;
+        }
+      }
+      writer.close();
+      reader.close();
+      fs.rename(tmp, new Path(tmp.getParent(), name));
+      log.info("Copied local log " + name);
+      localfs.delete(new Path(localWalDirectory, name), true);
+    }
+  }
+
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq)
throws IOException {
     totalMinorCompactions++;
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3136,7 +3207,7 @@ public class TabletServer extends Abstra
       String recovery = null;
       for (String log : entry.logSet) {
         String[] parts = log.split("/"); // "host:port/filename"
-        log = ServerConstants.getRecoveryDir() + "/" + parts[1] + ".recovered";
+        log = ServerConstants.getRecoveryDir() + "/" + parts[1];
         Path finished = new Path(log + "/finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
@@ -3342,13 +3413,8 @@ public class TabletServer extends Abstra
       }
       
       @Override
-      public List<String> getCurrentLoggers() {
-        try {
-          return new ArrayList<String>(getLoggers());
-        } catch (Exception ex) {
-          log.warn(ex, ex);
-          return Collections.emptyList();
-        }
+      public Set<TServerInstance> getCurrentTServers() {
+        return null;
       }
       
       @Override

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1342229&r1=1342228&r2=1342229&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
Thu May 24 12:35:34 2012
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,13 +40,14 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LogWork;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
@@ -67,7 +69,7 @@ public class DfsLogger implements IRemot
     
     FileSystem getFileSystem();
     
-    List<String> getCurrentLoggers();
+    Set<TServerInstance> getCurrentTServers();
   }
 
   private LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<LogWork>();
@@ -85,64 +87,46 @@ public class DfsLogger implements IRemot
     @Override
     public void run() {
       ArrayList<LogWork> work = new ArrayList<LogWork>();
-      ArrayList<TabletMutations> mutations = new ArrayList<TabletMutations>();
       while (true) {
+        work.clear();
+        
         try {
-          
-          work.clear();
-          mutations.clear();
-          
           work.add(workQueue.take());
-          workQueue.drainTo(work);
-          
-          for (LogWork logWork : work)
-            if (logWork != CLOSED_MARKER)
-              mutations.addAll(logWork.mutations);
-          
-          synchronized (DfsLogger.this) {
+        } catch (InterruptedException ex) {
+          continue;
+        }
+        workQueue.drainTo(work);
+        
+        synchronized (closeLock) {
+          if (!closed) {
             try {
-              for (TabletMutations mutation : mutations) {
-                LogFileKey key = new LogFileKey();
-                key.event = MANY_MUTATIONS;
-                key.seq = mutation.seq;
-                key.tid = mutation.tabletID;
-                LogFileValue value = new LogFileValue();
-                Mutation[] m = new Mutation[mutation.mutations.size()];
-                for (int i = 0; i < m.length; i++)
-                  m[i] = new Mutation(mutation.mutations.get(i));
-                value.mutations = m;
-                write(key, value);
-              }
-            } catch (Exception e) {
-              log.error(e, e);
-              for (LogWork logWork : work)
-                if (logWork != CLOSED_MARKER)
-                  logWork.exception = e;
-            }
-          }
-          synchronized (closeLock) {
-            if (!closed) {
               logFile.flush();
               logFile.sync();
+            } catch (IOException ex) {
+              log.warn("Exception syncing " + ex);
+              for (LogWork logWork : work) {
+                logWork.exception = ex;
+              }
             }
-          }
-          
-          boolean sawClosedMarker = false;
-          for (LogWork logWork : work)
-            if (logWork == CLOSED_MARKER)
-              sawClosedMarker = true;
-            else
-              logWork.latch.countDown();
-          
-          if (sawClosedMarker) {
-            synchronized (closeLock) {
-              closeLock.notifyAll();
+          } else {
+            for (LogWork logWork : work) {
+              logWork.exception = new LoggerClosedException();
             }
-            break;
           }
-
-        } catch (Exception e) {
-          log.error(e.getMessage(), e);
+        }
+        
+        boolean sawClosedMarker = false;
+        for (LogWork logWork : work)
+          if (logWork == CLOSED_MARKER)
+            sawClosedMarker = true;
+          else
+            logWork.latch.countDown();
+        
+        if (sawClosedMarker) {
+          synchronized (closeLock) {
+            closeLock.notifyAll();
+          }
+          break;
         }
       }
     }
@@ -185,12 +169,20 @@ public class DfsLogger implements IRemot
 
   public synchronized void open() throws IOException {
     String filename = UUID.randomUUID().toString();
+
     logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
     try {
-      short replication = (short) conf.getConfiguration().getCount(Property.LOGGER_RECOVERY_FILE_REPLICATION);
+      FileSystem fs = conf.getFileSystem();
+      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = (short) conf.getFileSystem().getDefaultReplication();
-      logFile = conf.getFileSystem().create(logPath, replication);
+        replication = (short) fs.getDefaultReplication();
+      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
+      if (blockSize == 0)
+        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE)
* 1.1);
+      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
+      blockSize -= blockSize % checkSum;
+      blockSize = Math.max(blockSize, checkSum);
+      logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096),
replication, blockSize);
       LogFileKey key = new LogFileKey();
       key.event = OPEN;
       key.tserverSession = filename;
@@ -261,7 +253,8 @@ public class DfsLogger implements IRemot
       try {
         logFile.close();
       } catch (IOException ex) {
-        throw new RuntimeException(ex);
+        log.error(ex);
+        throw new LoggerClosedException();
       }
   }
   
@@ -276,12 +269,13 @@ public class DfsLogger implements IRemot
     key.seq = seq;
     key.tid = tid;
     key.tablet = tablet;
-    write(key, EMPTY);
     try {
+      write(key, EMPTY);
       logFile.flush();
       logFile.sync();
     } catch (IOException ex) {
-      throw new RuntimeException(ex);
+      log.error(ex);
+      throw new LoggerClosedException();
     }
   }
   
@@ -290,13 +284,9 @@ public class DfsLogger implements IRemot
    * @param empty2
    * @throws IOException
    */
-  private synchronized void write(LogFileKey key, LogFileValue value) {
-    try {
-      key.write(logFile);
-      value.write(logFile);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
+  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException
{
+    key.write(logFile);
+    value.write(logFile);
   }
 
   /* (non-Javadoc)
@@ -314,12 +304,32 @@ public class DfsLogger implements IRemot
   public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException,
LoggerClosedException, TException {
     LogWork work = new LogWork(mutations, new CountDownLatch(1));
     
+    synchronized (DfsLogger.this) {
+      try {
+        for (TabletMutations mutation : mutations) {
+          LogFileKey key = new LogFileKey();
+          key.event = MANY_MUTATIONS;
+          key.seq = mutation.seq;
+          key.tid = mutation.tabletID;
+          LogFileValue value = new LogFileValue();
+          Mutation[] m = new Mutation[mutation.mutations.size()];
+          for (int i = 0; i < m.length; i++)
+            m[i] = new Mutation(mutation.mutations.get(i));
+          value.mutations = m;
+          write(key, value);
+        }
+      } catch (Exception e) {
+        log.error(e, e);
+        work.exception = e;
+      }
+    }
+
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue does not need
       // to wait on walog I/O operations
 
       if (closed)
-        throw new NoSuchLogIDException();
+        throw new LoggerClosedException();
       workQueue.add(work);
     }
 
@@ -335,7 +345,12 @@ public class DfsLogger implements IRemot
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tid = tid;
-    write(key, EMPTY);
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw new LoggerClosedException();
+    }
   }
   
   /* (non-Javadoc)
@@ -348,24 +363,33 @@ public class DfsLogger implements IRemot
     key.seq = seq;
     key.tid = tid;
     key.filename = fqfn;
-    write(key, EMPTY);
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw new LoggerClosedException();
+    }
   }
   
   /* (non-Javadoc)
    * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#startCopy(java.lang.String,
java.lang.String)
    */
   @Override
-  public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName) throws
ThriftSecurityException, TException {
-    MutationLogger.Iface client = null;
+  public synchronized LogCopyInfo startCopy(String source, String dest) throws ThriftSecurityException,
TException {
+    Iface client = null;
     try {
-      List<String> currentLoggers = conf.getCurrentLoggers();
-      if (currentLoggers.isEmpty())
-        throw new RuntimeException("No loggers for recovery");
+      Set<TServerInstance> current = conf.getCurrentTServers();
+      if (current.isEmpty())
+        throw new RuntimeException("No servers for recovery");
       Random random = new Random();
-      int choice = random.nextInt(currentLoggers.size());
-      String address = currentLoggers.get(choice);
-      client = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT,
Property.TSERV_LOGGER_TIMEOUT, conf.getConfiguration());
-      return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName,
true);
+      int choice = random.nextInt(current.size());
+      TServerInstance instance = current.toArray(new TServerInstance[] {})[choice];
+      client = ThriftUtil.getTServerClient(instance.hostPort(), conf.getConfiguration());
+      log.info("Asking " + instance.hostPort() + " to copy/sort from " + source + " to "
+ dest);
+      LogCopyInfo result = new LogCopyInfo();
+      client.sortLog(null, SecurityConstants.getSystemCredentials(), source, dest);
+      result.fileSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
+      return result;
     } finally {
       if (client != null)
         ThriftUtil.returnClient(client);

Added: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1342229&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
(added)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
Thu May 24 12:35:34 2012
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.log;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.MapFile;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+public class LogSorter {
+  
+  private static final Logger log = Logger.getLogger(LogSorter.class);
+  FileSystem fs;
+  AccumuloConfiguration conf;
+  
+  class Work implements Runnable {
+    final String name;
+    FSDataInputStream input;
+    final String destPath;
+    long bytesCopied = -1;
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+    
+    Work(String name, FSDataInputStream input, String destPath) {
+      this.name = name;
+      this.input = input;
+      this.destPath = destPath;
+    }
+    synchronized boolean finished() {
+      return input == null;
+    }
+    public void run() {
+      String formerThreadName = Thread.currentThread().getName();
+      try {
+        final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+        Thread.currentThread().setName("Sorting " + name + " for recovery");
+        int part = 0;
+        while (true) {
+          final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,
LogFileValue>>();
+          try {
+            long start = input.getPos();
+            while (input.getPos() - start < bufferSize) {
+              LogFileKey key = new LogFileKey();
+              LogFileValue value = new LogFileValue();
+              key.readFields(input);
+              value.readFields(input);
+              buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+            }
+            writeBuffer(buffer, part++);
+            buffer.clear();
+          } catch (EOFException ex) {
+            writeBuffer(buffer, part++);
+            break;
+          }
+        }
+        fs.create(new Path(destPath, "finished")).close();
+        log.info("Log copy/sort of " + name + " complete");
+      } catch (Throwable t) {
+        try {
+          fs.create(new Path(destPath, "failed")).close();
+        } catch (IOException e) {
+          log.error("Error creating failed flag file " + name, e);
+        }
+        log.error(t, t);
+      } finally {
+        Thread.currentThread().setName(formerThreadName);
+        try {
+          close();
+        } catch (IOException e) {
+          log.error("Error during cleanup sort/copy " + name, e);
+        }
+      }
+    }
+    
+    private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer,
int part) throws IOException {
+      String path = destPath + String.format("/part-r-%05d", part++);
+      MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class,
LogFileValue.class);
+      try {
+        Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>()
{
+          @Override
+          public int compare(Pair<LogFileKey,LogFileValue> o1, Pair<LogFileKey,LogFileValue>
o2) {
+            return o1.getFirst().compareTo(o2.getFirst());
+          }
+        });
+        for (Pair<LogFileKey,LogFileValue> entry : buffer) {
+          output.append(entry.getFirst(), entry.getSecond());
+        }
+      } finally {
+        output.close();
+      }
+    }
+    
+    synchronized void close() throws IOException {
+      bytesCopied = input.getPos();
+      input.close();
+      input = null;
+    }
+  };
+  
+  final ExecutorService threadPool;
+  Map<String,Work> sorts = new ConcurrentHashMap<String,Work>();
+  
+  public LogSorter(FileSystem fs, AccumuloConfiguration conf) {
+    this.fs = fs;
+    this.conf = conf;
+    int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
+    this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
+  }
+  
+  public double sort(String src, String dest) throws IOException {
+    synchronized (this) {
+      Work work = sorts.get(src);
+      if (work == null) {
+        work = startSort(src, dest);
+        sorts.put(src, work);
+      } else {
+        if (work.finished())
+          sorts.remove(src);
+      }
+      long bytesCopied = work.getBytesCopied();
+      long estimate = conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
+      return bytesCopied / ((double) estimate);
+    }
+  }
+  
+  private Work startSort(String src, String dest) throws IOException {
+    log.info("Copying " + src + " to " + dest);
+    Path srcPath = new Path(src);
+    while (true) {
+      try {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem dfs = (DistributedFileSystem) fs;
+          dfs.recoverLease(srcPath);
+          log.debug("recovered lease on " + srcPath);
+        } else {
+          fs.append(srcPath).close();
+          log.debug("successfully appended to " + srcPath);
+        }
+        break;
+      } catch (IOException e) {
+        log.debug("error recovering lease on " + srcPath, e);
+        UtilWaitThread.sleep(1000);
+        log.debug("retrying lease recovery on " + srcPath);
+      }
+    }
+    Work work = new Work(srcPath.getName(), fs.open(srcPath), dest);
+    threadPool.execute(work);
+    return work;
+  }
+}

Propchange: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1342229&r1=1342228&r2=1342229&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
(original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
Thu May 24 12:35:34 2012
@@ -188,6 +188,18 @@ public class NullTserver {
     public void flush(TInfo tinfo, AuthInfo credentials, String lock, String tableId, ByteBuffer
startRow, ByteBuffer endRow) throws TException {
       
     }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#sortLog(org.apache.accumulo.cloudtrace.thrift.TInfo,
+     * org.apache.accumulo.core.security.thrift.AuthInfo, java.lang.String)
+     */
+    @Override
+    public double sortLog(TInfo tinfo, AuthInfo credentials, String lock, String path) throws
ThriftSecurityException, TException {
+      // TODO Auto-generated method stub
+      return 0;
+    }
   }
   
   public static void main(String[] args) throws Exception {

Propchange: accumulo/branches/ACCUMULO-578/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1341040-1341571
  Merged /accumulo/branches/1.4/src:r1339324-1341134

Modified: accumulo/branches/ACCUMULO-578/test/system/auto/TestUtils.py
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/test/system/auto/TestUtils.py?rev=1342229&r1=1342228&r2=1342229&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/test/system/auto/TestUtils.py (original)
+++ accumulo/branches/ACCUMULO-578/test/system/auto/TestUtils.py Thu May 24 12:35:34 2012
@@ -24,7 +24,7 @@ import socket
 import signal
 import select
 import random
-
+import shutil
 import sleep
 
 # mapreduce sets SIGHUP to ignore, which we use to stop child processes
@@ -45,7 +45,10 @@ ACCUMULO_DIR = "/user/" + os.getlogin() 
 SITE = "test-" + ID
 
 WALOG = os.path.join(ACCUMULO_HOME, 'walogs', ID)
-General_CLASSPATH = 	"$ACCUMULO_HOME/lib/[^.].$ACCUMULO_VERSION.jar, $ACCUMULO_HOME/lib/[^.].*.jar,
$ZOOKEEPER_HOME/zookeeper[^.].*.jar, $HADOOP_HOME/conf,$HADOOP_HOME/[^.].*.jar, $HADOOP_HOME/lib/[^.].*.jar"
+LOG_PROPERTIES= os.path.join(ACCUMULO_HOME, 'conf', 'log4j.properties')
+LOG_GENERIC = os.path.join(ACCUMULO_HOME, 'conf', 'generic_logger.xml')
+General_CLASSPATH = ("$ACCUMULO_HOME/lib/[^.].$ACCUMULO_VERSION.jar, $ACCUMULO_HOME/lib/[^.].*.jar,
$ZOOKEEPER_HOME/zookeeper[^.].*.jar,"
+"$HADOOP_HOME/conf,$HADOOP_HOME/[^.].*.jar, $HADOOP_HOME/lib/[^.].*.jar") 
 
 log = logging.getLogger('test.auto')
 
@@ -79,8 +82,8 @@ class TestUtilsMixin:
 
     settings = {'tserver.port.search': 'true',
                 'tserver.memory.maps.max':'100M',
-		'tserver.cache.data.size':'10M',
-		'tserver.cache.index.size':'20M',
+                'tserver.cache.data.size':'10M',
+                'tserver.cache.index.size':'20M',
                 'instance.zookeeper.timeout': '10s',
                 'gc.cycle.delay': '1s',
                 'gc.cycle.start': '1s',
@@ -95,13 +98,13 @@ class TestUtilsMixin:
         log.debug('%s: %s', host, ' '.join(cmd))
         if host == 'localhost':
             os.environ['ACCUMULO_TSERVER_OPTS']='-Xmx700m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75
'
-            os.environ['ACCUMULO_GENERAL_OPTS']='-Dorg.apache.accumulo.config.file=%s ' %
SITE
+            os.environ['ACCUMULO_GENERAL_OPTS']=('-Dorg.apache.accumulo.config.file=%s' %
(SITE))
             os.environ['ACCUMULO_LOG_DIR']= ACCUMULO_HOME + '/logs/' + ID
             return Popen(cmd, stdout=PIPE, stderr=PIPE, **opts)
         else:
             cp = 'HADOOP_CLASSPATH=%s' % os.environ.get('HADOOP_CLASSPATH','')
             jo = "ACCUMULO_TSERVER_OPTS='-Xmx700m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75
'"
-            go = "ACCUMULO_GENERAL_OPTS='-Dorg.apache.accumulo.config.file=%s'" % SITE
+            go = ("ACCUMULO_GENERAL_OPTS='-Dorg.apache.accumulo.config.file=%s'" % (SITE))
             ld = 'ACCUMULO_LOG_DIR=%s/logs/%s' % (ACCUMULO_HOME, ID)
             execcmd = ['ssh', '-q', host, cp, jo, go, ld] + quote(cmd)
             log.debug(repr(execcmd))
@@ -238,16 +241,16 @@ class TestUtilsMixin:
                   'w')
         fp.write('<configuration>\n')
         settings = self.settings.copy()
-        settings.update({'instance.zookeeper.host': ZOOKEEPERS,
-                         'instance.dfs.dir': ACCUMULO_DIR,
-                         'tserver.port.client': 39000 + FUZZ,
-                         'master.port.client':  41000 + FUZZ,
-                         'monitor.port.client': 50099,
-                         'logger.port.client':  44000 + FUZZ,
-                         'gc.port.client':      45000 + FUZZ,
-                         'logger.dir.walog': WALOG,
-			 'general.classpaths' :General_CLASSPATH,
-                         'instance.secret': 'secret',
+        settings.update({ 'instance.zookeeper.host': ZOOKEEPERS,
+                          'instance.dfs.dir': ACCUMULO_DIR,
+                          'tserver.port.client': 39000 + FUZZ,
+                          'master.port.client':  41000 + FUZZ,
+                          'monitor.port.client': 50099,
+                          'logger.port.client':  44000 + FUZZ,
+                          'gc.port.client':      45000 + FUZZ,
+                          'logger.dir.walog': WALOG,
+                          'general.classpaths' :General_CLASSPATH,
+                          'instance.secret': 'secret',
                          })
         for a, v in settings.items():
             fp.write('  <property>\n')
@@ -273,6 +276,16 @@ class TestUtilsMixin:
         out, err = handle.communicate(INSTANCE_NAME+"\n"+ROOT_PASSWORD + "\n" + ROOT_PASSWORD+"\n")
         self.processResult(out, err, handle.returncode)
 
+    def setup_logging(self):
+      if os.path.exists(LOG_PROPERTIES):
+         os.rename(LOG_PROPERTIES, '%s.bkp' % LOG_PROPERTIES)
+      if os.path.exists(LOG_GENERIC):
+         os.rename(LOG_GENERIC, '%s.bkp' % LOG_GENERIC)
+      
+      shutil.copyfile('%s/conf/examples/512MB/standalone/log4j.properties' % ACCUMULO_HOME,
LOG_PROPERTIES)
+      shutil.copyfile('%s/conf/examples/512MB/standalone/generic_logger.xml' % ACCUMULO_HOME,
LOG_GENERIC)
+      
+
     def start_accumulo_procs(self, safeMode=None):
         self.accumuloHandles = [
            self.start_logger(host) for host in self.hosts 
@@ -380,6 +393,16 @@ class TestUtilsMixin:
         for h in self.accumuloHandles:
             self.waitForStop(h, 60)
 
+    def clean_logging(self):
+      LOG_PROPERTIES_BACKUP='%s.bkp' % LOG_PROPERTIES 
+      LOG_GENERIC_BACKUP='%s.bkp' % LOG_GENERIC
+      os.remove(LOG_PROPERTIES)
+      os.remove(LOG_GENERIC)
+      if os.path.exists(LOG_PROPERTIES_BACKUP):
+        os.rename(LOG_PROPERTIES_BACKUP, LOG_PROPERTIES)
+      if os.path.exists(LOG_GENERIC_BACKUP):
+         os.rename(LOG_GENERIC_BACKUP, LOG_GENERIC)
+
     def sleep(self, secs):
         log.debug("Sleeping %f seconds" % secs)
         sleep.sleep(secs)
@@ -387,19 +410,21 @@ class TestUtilsMixin:
     def setUp(self):
         self.hosts = self.options.hosts
         self.clean_accumulo(self.masterHost())
+        self.setup_logging()
         self.start_accumulo()
 
     def tearDown(self):
         if self.options.clean:
-           self.stop_accumulo()
-           self.wait(self.runOn(self.masterHost(),
-                                ['hadoop', 'fs', '-rmr', ACCUMULO_DIR]))
-           self.wait(self.runClassOn(self.masterHost(),
-                                     'org.apache.accumulo.server.util.DeleteZooInstance',
-                                     [INSTANCE_NAME]))
-           self.wait(self.runOn(self.masterHost(), ['rm', '-rf', WALOG]))
-           self.wait(self.runOn(self.masterHost(), ['rm', '-rf', ACCUMULO_HOME + '/logs/'
+ ID]))
-           os.unlink(os.path.join(ACCUMULO_HOME, 'conf', SITE))
+          self.stop_accumulo()
+          self.wait(self.runOn(self.masterHost(),
+                               ['hadoop', 'fs', '-rmr', ACCUMULO_DIR]))
+          self.wait(self.runClassOn(self.masterHost(),
+                                    'org.apache.accumulo.server.util.DeleteZooInstance',
+                                    [INSTANCE_NAME]))
+          self.wait(self.runOn(self.masterHost(), ['rm', '-rf', WALOG]))
+          self.wait(self.runOn(self.masterHost(), ['rm', '-rf', ACCUMULO_HOME + '/logs/'
+ ID]))
+          self.clean_logging() 
+          os.unlink(os.path.join(ACCUMULO_HOME, 'conf', SITE))
 
     def createTable(self, table, splitFile=None):
         if splitFile :



Mime
View raw message