Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92F409EA4 for ; Wed, 30 May 2012 18:35:08 +0000 (UTC) Received: (qmail 40895 invoked by uid 500); 30 May 2012 18:35:08 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 40875 invoked by uid 500); 30 May 2012 18:35:08 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 40866 invoked by uid 99); 30 May 2012 18:35:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2012 18:35:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2012 18:35:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0E96423889BB for ; Wed, 30 May 2012 18:34:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1344396 [3/3] - in /accumulo/branches/ACCUMULO-578: core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/master/thrift/ core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ core/src/main/java... Date: Wed, 30 May 2012 18:34:44 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120530183446.0E96423889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1344396&r1=1344395&r2=1344396&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed May 30 18:34:43 2012 @@ -21,23 +21,39 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Map.Entry; +import java.util.Random; +import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.master.thrift.RecoveryStatus; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.MapFile; import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; /** * @@ -48,30 +64,37 @@ public class LogSorter { FileSystem fs; AccumuloConfiguration conf; + private Map currentWork = new HashMap(); + class Work implements Runnable { final String name; FSDataInputStream input; final String destPath; long bytesCopied = -1; + long sortStart = 0; + long sortStop = -1; + private final LogSortNotifier cback; synchronized long getBytesCopied() throws IOException { return input == null ? bytesCopied : input.getPos(); } - Work(String name, FSDataInputStream input, String destPath) { + Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) { this.name = name; this.input = input; this.destPath = destPath; + this.cback = cback; } synchronized boolean finished() { return input == null; } public void run() { + sortStart = System.currentTimeMillis(); String formerThreadName = Thread.currentThread().getName(); + int part = 0; try { final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); Thread.currentThread().setName("Sorting " + name + " for recovery"); - int part = 0; while (true) { final ArrayList> buffer = new ArrayList>(); try { @@ -99,6 +122,11 @@ public class LogSorter { log.error("Error creating failed flag file " + name, e); } log.error(t, t); + try { + cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString()); + } catch (Exception ex) { + log.error("Strange error notifying the master of a logSort problem for file " + name); + } } finally { Thread.currentThread().setName(formerThreadName); try { @@ -106,6 +134,15 @@ public class LogSorter { } catch (IOException e) { log.error("Error during cleanup sort/copy " + name, e); } + sortStop = System.currentTimeMillis(); + synchronized (currentWork) { + currentWork.remove(name); + } + try { + cback.notice(name, getBytesCopied(), part, getSortTime(), ""); + } catch (Exception ex) { + log.error("Strange error reporting successful log sort " + name, ex); + } } } @@ -132,56 +169,151 @@ public class LogSorter { input.close(); input = null; } + + public synchronized long getSortTime() { + if (sortStart > 0) { + if (sortStop > 0) + return sortStop - sortStart; + return System.currentTimeMillis() - sortStart; + } + return 0; + } }; - final ExecutorService threadPool; - Map sorts = new ConcurrentHashMap(); + final ThreadPoolExecutor threadPool; + private Instance instance; - public LogSorter(FileSystem fs, AccumuloConfiguration conf) { + public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) { + this.instance = instance; this.fs = fs; this.conf = conf; int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); - this.threadPool = Executors.newFixedThreadPool(threadPoolSize); + this.threadPool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); } - public double sort(String src, String dest) throws IOException { - synchronized (this) { - Work work = sorts.get(src); - if (work == null) { - work = startSort(src, dest); - sorts.put(src, work); - } else { - if (work.finished()) - sorts.remove(src); + public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException { + final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY; + final ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + zoo.mkdirs(path); + List children = zoo.getChildren(path, new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case NodeChildrenChanged: + if (event.getPath().equals(path)) + try { + attemptRecoveries(zoo, serverName, path, zoo.getChildren(path)); + } catch (KeeperException e) { + log.error("Unable to get recovery information", e); + } catch (InterruptedException e) { + log.info("Interrupted getting recovery information", e); + } + else + log.info("Unexpected path for NodeChildrenChanged event " + event.getPath()); + break; + case NodeCreated: + case NodeDataChanged: + case NodeDeleted: + case None: + log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path); + break; + + } } - long bytesCopied = work.getBytesCopied(); - long estimate = conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE); - return bytesCopied / ((double) estimate); - } + }); + attemptRecoveries(zoo, serverName, path, children); + Random r = new Random(); + // Add a little jitter to avoid all the tservers slamming zookeeper at once + SimpleTimer.getInstance().schedule(new TimerTask() { + @Override + public void run() { + try { + attemptRecoveries(zoo, serverName, path, zoo.getChildren(path)); + } catch (KeeperException e) { + log.error("Unable to get recovery information", e); + } catch (InterruptedException e) { + log.info("Interrupted getting recovery information", e); + } + } + }, r.nextInt(1000), 60 * 1000); } - private Work startSort(String src, String dest) throws IOException { + private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, String path, List children) { + if (children.size() == 0) + return; + log.info("Zookeeper references " + children.size() + " recoveries, attempting locks"); + Random random = new Random(); + Collections.shuffle(children, random); + try { + for (String child : children) { + final String childPath = path + "/" + child; + log.debug("Attempting to lock " + child); + ZooLock lock = new ZooLock(childPath); + if (lock.tryLock(new LockWatcher() { + @Override + public void lostLock(LockLossReason reason) { + log.info("Ignoring lost lock event, reason " + reason); + } + }, serverName.getBytes())) { + // Great... we got the lock, but maybe we're too busy + if (threadPool.getQueue().size() > 1) { + lock.unlock(); + continue; + } + byte[] contents = zoo.getData(childPath, null); + String destination = Constants.getRecoveryDir(conf) + "/" + child; + startSort(new String(contents), destination, new LogSortNotifier() { + @Override + public void notice(String name, long bytes, int parts, long milliseconds, String error) { + log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms"); + try { + zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP); + } catch (Exception e) { + log.error("Error received when trying to delete recovery entry in zookeeper " + childPath); + } + } + }); + } + } + } catch (Throwable t) { + log.error("Unexpected error", t); + } + } + + public interface LogSortNotifier { + public void notice(String name, long bytes, int parts, long milliseconds, String error); + } + + private void startSort(String src, String dest, LogSortNotifier cback) throws IOException { log.info("Copying " + src + " to " + dest); + fs.delete(new Path(dest), true); Path srcPath = new Path(src); - while (true) { - try { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - dfs.recoverLease(srcPath); - log.debug("recovered lease on " + srcPath); - } else { - fs.append(srcPath).close(); - log.debug("successfully appended to " + srcPath); - } - break; - } catch (IOException e) { - log.debug("error recovering lease on " + srcPath, e); - UtilWaitThread.sleep(1000); - log.debug("retrying lease recovery on " + srcPath); - } - } - Work work = new Work(srcPath.getName(), fs.open(srcPath), dest); - threadPool.execute(work); - return work; + synchronized (currentWork) { + Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback); + if (!currentWork.containsKey(srcPath.getName())) { + threadPool.execute(work); + currentWork.put(srcPath.getName(), work); + } + } + } + + public List getLogSorts() { + List result = new ArrayList(); + synchronized (currentWork) { + for (Entry entries : currentWork.entrySet()) { + RecoveryStatus status = new RecoveryStatus(); + status.name = entries.getKey(); + try { + status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE)); + } catch (IOException ex) { + log.warn("Error getting bytes read"); + } + status.runtime = (int) entries.getValue().getSortTime(); + result.add(status); + } + return result; + } } } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1344396&r1=1344395&r2=1344396&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Wed May 30 18:34:43 2012 @@ -59,7 +59,7 @@ public class TabletServerLogger { private final TabletServer tserver; // The current log set: always updated to a new set with every change of loggers - private final List loggers = new ArrayList(); + private final List loggers = new ArrayList(); // The current generation of logSet. // Because multiple threads can be using a log set at one time, a log @@ -132,7 +132,7 @@ public class TabletServerLogger { this.maxSize = maxSize; } - private int initializeLoggers(final List copy) throws IOException { + private int initializeLoggers(final List copy) throws IOException { final int[] result = {-1}; testLockAndRun(logSetLock, new TestCallWithWriteLock() { boolean test() { @@ -163,8 +163,8 @@ public class TabletServerLogger { public void getLoggers(Set loggersOut) { logSetLock.readLock().lock(); try { - for (IRemoteLogger logger : loggers) { - loggersOut.add(logger.getLogger()); + for (DfsLogger logger : loggers) { + loggersOut.add(logger.toString()); } } finally { logSetLock.readLock().unlock(); @@ -205,7 +205,7 @@ public class TabletServerLogger { throw new IllegalStateException("close should be called with write lock held!"); } try { - for (IRemoteLogger logger : loggers) { + for (DfsLogger logger : loggers) { try { logger.close(); } catch (Throwable ex) { @@ -220,7 +220,7 @@ public class TabletServerLogger { } interface Writer { - LoggerOperation write(IRemoteLogger logger, int seq) throws Exception; + LoggerOperation write(DfsLogger logger, int seq) throws Exception; } private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException { @@ -239,7 +239,7 @@ public class TabletServerLogger { while (!success) { try { // get a reference to the loggers that no other thread can touch - ArrayList copy = new ArrayList(); + ArrayList copy = new ArrayList(); currentLogSet = initializeLoggers(copy); // add the logger to the log set for the memory in the tablet, @@ -268,7 +268,7 @@ public class TabletServerLogger { if (seq < 0) throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); ArrayList queuedOperations = new ArrayList(copy.size()); - for (IRemoteLogger wal : copy) { + for (DfsLogger wal : copy) { LoggerOperation lop = writer.write(wal, seq); if (lop != null) queuedOperations.add(lop); @@ -330,7 +330,7 @@ public class TabletServerLogger { return -1; return write(commitSession, false, new Writer() { @Override - public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent()); return null; } @@ -342,7 +342,7 @@ public class TabletServerLogger { return -1; int seq = write(commitSession, false, new Writer() { @Override - public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { return logger.log(tabletSeq, commitSession.getLogId(), m); } }); @@ -362,7 +362,7 @@ public class TabletServerLogger { int seq = write(loggables.keySet(), false, new Writer() { @Override - public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { List copy = new ArrayList(loggables.size()); for (Entry> entry : loggables.entrySet()) { CommitSession cs = entry.getKey(); @@ -393,7 +393,7 @@ public class TabletServerLogger { int seq = write(commitSession, true, new Writer() { @Override - public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName); return null; } @@ -409,7 +409,7 @@ public class TabletServerLogger { return -1; write(commitSession, false, new Writer() { @Override - public LoggerOperation write(IRemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName); return null; } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1344396&r1=1344395&r2=1344396&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Wed May 30 18:34:43 2012 @@ -108,16 +108,12 @@ public class GetMasterStats { out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : info.minor.queued); } } - } - } - if (stats.recovery != null && stats.recovery.size() > 0) { - out(0, "Recovery"); - for (RecoveryStatus r : stats.recovery) { - out(1, "Log Server %s", r.host); - out(1, "Log Name %s", r.name); - out(1, "Map Progress: %.2f%%", r.mapProgress * 100); - out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100); - out(1, "Time running: %s", r.runtime / 1000.); + out(2, "Recoveries %d", server.logSorts.size()); + for (RecoveryStatus sort : server.logSorts) { + out(3, "File %s", sort.name); + out(3, "Progress %.2f%%", sort.progress * 100); + out(3, "Time running %s", sort.runtime / 1000.); + } } } } Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1344396&r1=1344395&r2=1344396&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Wed May 30 18:34:43 2012 @@ -188,17 +188,6 @@ public class NullTserver { /* * (non-Javadoc) * - * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#sortLog(org.apache.accumulo.cloudtrace.thrift.TInfo, - * org.apache.accumulo.core.security.thrift.AuthInfo, java.lang.String) - */ - @Override - public double sortLog(TInfo tinfo, AuthInfo credentials, String lock, String path) throws ThriftSecurityException, TException { - return 0; - } - - /* - * (non-Javadoc) - * * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo, * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List) */ Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1344396&r1=1344395&r2=1344396&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java (original) +++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java Wed May 30 18:34:43 2012 @@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab // If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually public class TraceFileSystem extends FileSystem { + @Override public void setConf(Configuration conf) { Span span = Trace.start("setConf"); @@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil this.impl = impl; } + public FileSystem getImplementation() { + return impl; + } + @Override public URI getUri() { Span span = Trace.start("getUri");