Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7BBCFCF55 for ; Tue, 5 Jun 2012 13:19:09 +0000 (UTC) Received: (qmail 59282 invoked by uid 500); 5 Jun 2012 13:19:09 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 59248 invoked by uid 500); 5 Jun 2012 13:19:09 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 59184 invoked by uid 99); 5 Jun 2012 13:19:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2012 13:19:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2012 13:18:52 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 310602388BA2 for ; Tue, 5 Jun 2012 13:18:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1346380 [4/5] - in /accumulo/trunk: ./ bin/ core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accum... Date: Tue, 05 Jun 2012 13:18:25 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120605131829.310602388BA2@eris.apache.org> Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java Tue Jun 5 13:18:22 2012 @@ -23,7 +23,9 @@ import org.apache.thrift.transport.TSock public class AddressUtil { static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException { - final String[] parts = address.split(":", 2); + String[] parts = address.split(":", 2); + if (address.contains("+")) + parts = address.split("\\+", 2); if (parts.length == 2) { if (parts[1].isEmpty()) return new InetSocketAddress(parts[0], defaultPort); Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java?rev=1346380&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java Tue Jun 5 13:18:22 2012 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.cloudtrace.instrument.TraceRunnable; +import org.apache.log4j.Logger; + +public class NamingThreadFactory implements ThreadFactory { + private static final Logger log = Logger.getLogger(NamingThreadFactory.class); + + private AtomicInteger threadNum = new AtomicInteger(1); + private String name; + + public NamingThreadFactory(String name) { + this.name = name; + } + + public Thread newThread(Runnable r) { + return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement()); + } + +} Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java?rev=1346380&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java Tue Jun 5 13:18:22 2012 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * + */ +public class SimpleThreadPool extends ThreadPoolExecutor { + + public SimpleThreadPool(int max, final String name) { + super(0, Integer.MAX_VALUE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamingThreadFactory(name)); + } + +} Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java Tue Jun 5 13:18:22 2012 @@ -49,6 +49,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +@SuppressWarnings("deprecation") public class SetIterCommand extends Command { private Option mincScopeOpt, majcScopeOpt, scanScopeOpt, nameOpt, priorityOpt; Modified: accumulo/trunk/core/src/main/thrift/master.thrift URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/master.thrift?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/core/src/main/thrift/master.thrift (original) +++ accumulo/trunk/core/src/main/thrift/master.thrift Tue Jun 5 13:18:22 2012 @@ -42,16 +42,9 @@ struct TableInfo { } struct RecoveryStatus { - 1:string host 2:string name - 3:double mapProgress - 4:double reduceProgress 5:i32 runtime // in millis - 6:double copyProgress -} - -struct LoggerStatus { - 1:string logger + 6:double progress } struct TabletServerStatus { @@ -61,11 +54,11 @@ struct TabletServerStatus { 5:double osLoad 7:i64 holdTime 8:i64 lookups - 9:set loggers 10:i64 indexCacheHits 11:i64 indexCacheRequest 12:i64 dataCacheHits - 13:i64 dataCacheRequest + 13:i64 dataCacheRequest + 14:list logSorts } enum MasterState { @@ -94,14 +87,11 @@ struct MasterMonitorInfo { 1:map tableMap 2:list tServerInfo 3:map badTServers - 4:list recovery - 5:list loggers 6:MasterState state 8:MasterGoalState goalState 7:i32 unassignedTablets 9:set serversShuttingDown 10:list deadTabletServers - 11:list deadLoggers } struct TabletSplit { Modified: accumulo/trunk/core/src/main/thrift/tabletserver.thrift URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/tabletserver.thrift?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/core/src/main/thrift/tabletserver.thrift (original) +++ accumulo/trunk/core/src/main/thrift/tabletserver.thrift Tue Jun 5 13:18:22 2012 @@ -148,8 +148,6 @@ service TabletClientService extends clie oneway void chop(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:data.TKeyExtent extent), oneway void compact(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:string tableId, 5:binary startRow, 6:binary endRow), - oneway void useLoggers(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:set loggers), - master.TabletServerStatus getTabletServerStatus(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec) list getTabletStats(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tableId) throws (1:security.ThriftSecurityException sec) TabletStats getHistoricalStats(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec) @@ -157,57 +155,13 @@ service TabletClientService extends clie oneway void fastHalt(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string lock); list getActiveScans(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec) + oneway void removeLogs(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:list filenames) } -// LogID should be cryptographically unguessable -typedef i64 LogID typedef i32 TabletID -exception NoSuchLogIDException { -} - -exception LoggerClosedException { -} - -struct LogFile { - 1:string name, - 2:LogID id, -} - struct TabletMutations { 1:TabletID tabletID, 2:i64 seq, 3:list mutations } - -struct LogCopyInfo { - 1:i64 fileSize, - 2:string loggerZNode -} - -service MutationLogger { - LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession) throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce), - - // Note that these methods correspond to org.apache.accumulo.server.tabletserver.log.TabletLog - void defineTablet(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TKeyExtent tablet) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - void log(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TMutation mutation) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - void logManyTablets(3:cloudtrace.TInfo tinfo, 1:LogID id, 2:list mutations) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - void minorCompactionStarted(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - void minorCompactionFinished(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), - - // close a log file and initiate the push to HDFS - LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo, - 1:security.AuthInfo credentials, - 2:string name, - 3:string fullyQualifiedFileName, - 5:bool sort) throws (1:security.ThriftSecurityException sec), - - // Support log garbage collection - list getClosedLogs(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec), - oneway void remove(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:list files), - - // Shutdown - void beginShutdown(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec); - oneway void halt(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials); -} Modified: accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java (original) +++ accumulo/trunk/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Tue Jun 5 13:18:22 2012 @@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.m import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.user.SummingCombiner; -import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; Modified: accumulo/trunk/pom.xml URL: http://svn.apache.org/viewvc/accumulo/trunk/pom.xml?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/pom.xml (original) +++ accumulo/trunk/pom.xml Tue Jun 5 13:18:22 2012 @@ -498,7 +498,7 @@ org.apache.hadoop hadoop-core - 0.20.203.0 + 0.20.205.0 provided Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Tue Jun 5 13:18:22 2012 @@ -57,6 +57,7 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.StopWatch; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -138,7 +139,7 @@ public class BulkImporter { final Map> assignments = Collections.synchronizedSortedMap(new TreeMap>()); timer.start(Timers.EXAMINE_MAP_FILES); - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping")); for (Path path : paths) { final Path mapFile = path; @@ -376,7 +377,7 @@ public class BulkImporter { final Map> ais = Collections.synchronizedMap(new TreeMap>()); - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes")); for (final Entry> entry : assignments.entrySet()) { if (entry.getValue().size() == 1) { @@ -586,12 +587,11 @@ public class BulkImporter { apt.put(entry.getKey(), entry.getValue()); } - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit")); for (Entry>> entry : assignmentsPerTabletServer.entrySet()) { String location = entry.getKey(); - threadPool - .submit(new TraceRunnable(new LoggingRunnable(log, new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue())))); + threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue())); } threadPool.shutdown(); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Tue Jun 5 13:18:22 2012 @@ -17,53 +17,49 @@ package org.apache.accumulo.server.gc; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.UUID; import org.apache.accumulo.cloudtrace.instrument.Span; import org.apache.accumulo.cloudtrace.instrument.Trace; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.tabletserver.thrift.MutationLogger; -import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.Iface; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.security.SecurityConstants; +import org.apache.accumulo.server.util.AddressUtil; import org.apache.accumulo.server.util.MetadataTable; import org.apache.accumulo.server.util.MetadataTable.LogEntry; -import org.apache.accumulo.server.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; public class GarbageCollectWriteAheadLogs { private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class); - private final AccumuloConfiguration conf; + private final Instance instance; private final FileSystem fs; - GarbageCollectWriteAheadLogs(FileSystem fs, AccumuloConfiguration conf) { + GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs) { + this.instance = instance; this.fs = fs; - this.conf = conf; } public void collect(GCStatus status) { @@ -111,59 +107,51 @@ public class GarbageCollectWriteAheadLog } } + boolean holdsLock(InetSocketAddress addr) { + try { + String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + org.apache.accumulo.core.util.AddressUtil.toString(addr); + List children = ZooReaderWriter.getInstance().getChildren(zpath); + return !(children == null || children.isEmpty()); + } catch (KeeperException.NoNodeException ex) { + return false; + } catch (Exception ex) { + log.debug(ex, ex); + return true; + } + } + private int removeFiles(Map> serverToFileMap, final GCStatus status) { - final AtomicInteger count = new AtomicInteger(); - ExecutorService threadPool = java.util.concurrent.Executors.newCachedThreadPool(); - - for (final Entry> serverFiles : serverToFileMap.entrySet()) { - final String server = serverFiles.getKey(); - final List files = serverFiles.getValue(); - threadPool.submit(new Runnable() { - @Override - public void run() { + AccumuloConfiguration conf = instance.getConfiguration(); + for (Entry> entry : serverToFileMap.entrySet()) { + if (entry.getKey().length() == 0) { + // old-style log entry, just remove it + for (String filename : entry.getValue()) { + log.debug("Removing old-style WAL " + entry.getValue()); try { - Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), server, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf); - try { - count.addAndGet(files.size()); - log.debug(String.format("removing %d files from %s", files.size(), server)); - if (files.size() > 0) { - log.debug("deleting files on logger " + server); - for (String file : files) { - log.debug("Deleting " + file); - } - logger.remove(null, SecurityConstants.getSystemCredentials(), files); - synchronized (status.currentLog) { - status.currentLog.deleted += files.size(); - } - } - } finally { - ThriftUtil.returnClient(logger); - } - log.info(String.format("Removed %d files from %s", files.size(), server)); - for (String file : files) { - try { - for (FileStatus match : fs.globStatus(new Path(ServerConstants.getRecoveryDir(), file + "*"))) { - fs.delete(match.getPath(), true); - } - } catch (IOException ex) { - log.warn("Error deleting recovery data: ", ex); - } - } - } catch (TTransportException err) { - log.info("Ignoring communication error talking to logger " + serverFiles.getKey() + " (probably a timeout)"); - } catch (TException err) { - log.info("Ignoring exception talking to logger " + serverFiles.getKey() + "(" + err + ")"); + fs.delete(new Path(Constants.getWalDirectory(conf), filename), true); + } catch (IOException ex) { + log.error("Unable to delete wal " + filename + ": " + ex); } } - }); - + } else { + InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT); + if (!holdsLock(address)) + continue; + Iface tserver = null; + try { + tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + tserver.removeLogs(null, SecurityConstants.getSystemCredentials(), entry.getValue()); + log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); + status.currentLog.deleted += entry.getValue().size(); + } catch (TException e) { + log.warn("Error talking to " + address + ": " + e); + } finally { + if (tserver != null) + ThriftUtil.returnClient(tserver); + } + } } - threadPool.shutdown(); - while (!threadPool.isShutdown()) - try { - threadPool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) {} - return count.get(); + return 0; } private static Map> mapServersToFiles(Map fileToServerMap) { @@ -194,31 +182,41 @@ public class GarbageCollectWriteAheadLog } private int scanServers(Map fileToServerMap) throws Exception { - int count = 0; - IZooReaderWriter zk = ZooReaderWriter.getInstance(); - String loggersDir = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZLOGGERS; - List servers = zk.getChildren(loggersDir, null); - Collections.shuffle(servers); - for (String server : servers) { - String address = "no-data"; - count++; - try { - byte[] data = zk.getData(loggersDir + "/" + server, null); - address = new String(data); - Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf); - for (String log : logger.getClosedLogs(null, SecurityConstants.getSystemCredentials())) { - fileToServerMap.put(log, address); + AccumuloConfiguration conf = instance.getConfiguration(); + Path walRoot = new Path(Constants.getWalDirectory(conf)); + for (FileStatus status : fs.listStatus(walRoot)) { + String name = status.getPath().getName(); + if (status.isDir()) { + for (FileStatus file : fs.listStatus(new Path(walRoot, name))) { + if (isUUID(file.getPath().getName())) + fileToServerMap.put(file.getPath().getName(), name); + else { + log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid"); + } } - ThriftUtil.returnClient(logger); - } catch (TException err) { - log.warn("Ignoring exception talking to logger " + address); - } - if (SimpleGarbageCollector.almostOutOfMemory()) { - log.warn("Running out of memory collecting write-ahead log file names from loggers, continuing with a partial list"); - break; + } else if (isUUID(name)) { + // old-style WAL are not under a directory + fileToServerMap.put(name, ""); + } else { + log.info("Ignoring file " + name + " because it doesn't look like a uuid"); } } + + int count = 0; return count; } + /** + * @param name + * @return + */ + static private boolean isUUID(String name) { + try { + UUID.fromString(name); + return true; + } catch (IllegalArgumentException ex) { + return false; + } + } + } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Tue Jun 5 13:18:22 2012 @@ -64,6 +64,7 @@ import org.apache.accumulo.core.master.s import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; @@ -297,7 +298,7 @@ public class SimpleGarbageCollector impl // Clean up any unused write-ahead logs Span waLogs = Trace.start("walogs"); - GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(fs, instance.getConfiguration()); + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs); try { log.info("Beginning garbage collection of write-ahead logs"); walogCollector.collect(status); @@ -585,7 +586,7 @@ public class SimpleGarbageCollector impl final BatchWriter finalWriter = writer; - ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads); + ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting")); for (final String delete : confirmedDeletes) { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Tue Jun 5 13:18:22 2012 @@ -16,7 +16,7 @@ */ package org.apache.accumulo.server.logger; -import java.io.FileNotFoundException; +import java.io.EOFException; import java.io.IOException; import java.util.HashSet; import java.util.Set; @@ -36,9 +36,9 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; public class LogReader { @@ -108,28 +108,33 @@ public class LogReader { if (fs.isFile(path)) { // read log entries from a simple hdfs file - org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(file), conf); - while (reader.next(key, value)) { + FSDataInputStream f = fs.open(path); + while (true) { + try { + key.readFields(f); + value.readFields(f); + } catch (EOFException ex) { + break; + } printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max); } } else if (local.isFile(path)) { // read log entries from a simple file - org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(local, new Path(file), conf); - while (reader.next(key, value)) { + FSDataInputStream f = fs.open(path); + while (true) { + try { + key.readFields(f); + value.readFields(f); + } catch (EOFException ex) { + break; + } printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max); } } else { - try { - // read the log entries sorted in a map file - MultiReader input = new MultiReader(fs, conf, file); - while (input.next(key, value)) { - printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max); - } - } catch (FileNotFoundException ex) { - SequenceFile.Reader input = new SequenceFile.Reader(local, new Path(file), conf); - while (input.next(key, value)) { - printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max); - } + // read the log entries sorted in a map file + MultiReader input = new MultiReader(fs, conf, file); + while (input.next(key, value)) { + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max); } } } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Tue Jun 5 13:18:22 2012 @@ -133,15 +133,6 @@ public class LiveTServerSet implements W } } - public void useLoggers(Set loggers) throws TException { - TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.useLoggers(null, SecurityConstants.getSystemCredentials(), loggers); - } finally { - ThriftUtil.returnClient(client); - } - } - public void chop(ZooLock lock, KeyExtent extent) throws TException { TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); try { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Tue Jun 5 13:18:22 2012 @@ -69,7 +69,6 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.file.FileUtil; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.master.thrift.LoggerStatus; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.master.thrift.MasterClientService.Processor; import org.apache.accumulo.core.master.thrift.MasterGoalState; @@ -88,28 +87,20 @@ import org.apache.accumulo.core.util.Byt import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.LoggingRunnable; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.Accumulo; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fate.Fate; import org.apache.accumulo.server.fate.TStore.TStatus; import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter; -import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete; -import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.master.TabletServerLoggers.LoggerWatcher; import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer; -import org.apache.accumulo.server.master.balancer.LoggerBalancer; -import org.apache.accumulo.server.master.balancer.LoggerUser; -import org.apache.accumulo.server.master.balancer.SimpleLoggerBalancer; -import org.apache.accumulo.server.master.balancer.TServerUsesLoggers; import org.apache.accumulo.server.master.balancer.TabletBalancer; +import org.apache.accumulo.server.master.recovery.RecoverLease; import org.apache.accumulo.server.master.state.Assignment; import org.apache.accumulo.server.master.state.CurrentState; import org.apache.accumulo.server.master.state.DeadServerList; @@ -147,7 +138,6 @@ import org.apache.accumulo.server.securi import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.security.ZKAuthenticator; import org.apache.accumulo.server.tabletserver.TabletTime; -import org.apache.accumulo.server.tabletserver.log.RemoteLogger; import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.accumulo.server.util.AddressUtil; import org.apache.accumulo.server.util.DefaultMap; @@ -177,17 +167,19 @@ import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** - * The Master is responsible for assigning and balancing tablets and loggers to tablet servers. + * The Master is responsible for assigning and balancing tablets to tablet servers. * * The master will also coordinate log recoveries and reports general status. */ -public class Master implements LiveTServerSet.Listener, LoggerWatcher, TableObserver, CurrentState, JobComplete { +public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState { final private static Logger log = Logger.getLogger(Master.class); @@ -216,8 +208,6 @@ public class Master implements LiveTServ private ZooLock masterLock = null; private TServer clientService = null; - private TabletServerLoggers loggers = null; - private CoordinateRecoveryTask recovery = null; private TabletBalancer tabletBalancer; private MasterState state = MasterState.INITIAL; @@ -227,8 +217,8 @@ public class Master implements LiveTServ volatile private SortedMap tserverStatus = Collections .unmodifiableSortedMap(new TreeMap()); - private LoggerBalancer loggerBalancer; - + private Set recoveriesInProgress = Collections.synchronizedSet(new HashSet()); + synchronized private MasterState getMasterState() { return state; } @@ -549,7 +539,6 @@ public class Master implements LiveTServ tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this); this.tabletBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); this.tabletBalancer.init(serverConfig); - this.loggerBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class, new SimpleLoggerBalancer()); } public TServerConnection getConnection(TServerInstance server) { @@ -709,11 +698,6 @@ public class Master implements LiveTServ @Override public MasterMonitorInfo getMasterStats(TInfo info, AuthInfo credentials) throws ThriftSecurityException, TException { final MasterMonitorInfo result = new MasterMonitorInfo(); - result.loggers = new ArrayList(); - for (String logger : loggers.getLoggersFromZooKeeper().values()) { - result.loggers.add(new LoggerStatus(logger)); - } - result.recovery = recovery.status(); result.tServerInfo = new ArrayList(); result.tableMap = new DefaultMap(new TableInfo()); @@ -742,8 +726,6 @@ public class Master implements LiveTServ } DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS); result.deadTabletServers = obit.getList(); - obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS); - result.deadLoggers = obit.getList(); return result; } @@ -858,10 +840,6 @@ public class Master implements LiveTServ balancer.init(serverConfig); tabletBalancer = balancer; log.info("tablet balancer changed to " + tabletBalancer.getClass().getName()); - } else if (property.equals(Property.MASTER_LOGGER_BALANCER.getKey())) { - loggerBalancer = createInstanceFromPropertyName(instance.getConfiguration(), Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class, - new SimpleLoggerBalancer()); - log.info("log balancer changed to " + loggerBalancer.getClass().getName()); } } @@ -1089,6 +1067,7 @@ public class Master implements LiveTServ fate.delete(opid); } + } public MergeInfo getMergeInfo(Text tableId) { @@ -1373,9 +1352,8 @@ public class Master implements LiveTServ if (goal == TabletGoalState.HOSTED) { if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) { - if (!recovery.recover(SecurityConstants.getSystemCredentials(), tls.extent, tls.walogs, Master.this)) { + if (recoverLogs(tls.extent, tls.walogs)) continue; - } } switch (state) { case HOSTED: @@ -1959,7 +1937,6 @@ public class Master implements LiveTServ } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) { log.debug("not balancing because the master is attempting to stop cleanly"); } else { - balanceLoggers(); return balanceTablets(); } return DEFAULT_WAIT_FOR_WATCHER; @@ -1992,28 +1969,6 @@ public class Master implements LiveTServ } } - private void balanceLoggers() { - List logUsers = new ArrayList(); - for (Entry entry : tserverStatus.entrySet()) { - logUsers.add(new TServerUsesLoggers(entry.getKey(), entry.getValue())); - } - List logNames = new ArrayList(loggers.getLoggersFromZooKeeper().values()); - Map> assignmentsOut = new HashMap>(); - int loggersPerServer = getSystemConfiguration().getCount(Property.TSERV_LOGGER_COUNT); - loggerBalancer.balance(logUsers, logNames, assignmentsOut, loggersPerServer); - for (Entry> entry : assignmentsOut.entrySet()) { - TServerUsesLoggers tserver = (TServerUsesLoggers) entry.getKey(); - try { - log.debug("Telling " + tserver.getInstance() + " to use loggers " + entry.getValue()); - TServerConnection connection = tserverSet.getConnection(tserver.getInstance()); - if (connection != null) - connection.useLoggers(new HashSet(entry.getValue())); - } catch (Exception ex) { - log.warn("Unable to talk to " + tserver.getInstance(), ex); - } - } - } - private long balanceTablets() { List migrationsOut = new ArrayList(); Set migrationsCopy = new HashSet(); @@ -2048,7 +2003,7 @@ public class Master implements LiveTServ result.put(server, status); // TODO maybe remove from bad servers } catch (Exception ex) { - log.error("unable to get tablet server status " + server + " " + ex.getMessage()); + log.error("unable to get tablet server status " + server + " " + ex.toString()); log.debug("unable to get tablet server status " + server, ex); if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { log.warn("attempting to stop " + server); @@ -2073,6 +2028,31 @@ public class Master implements LiveTServ return result; } + public boolean recoverLogs(KeyExtent extent, Collection> walogs) throws IOException { + boolean recoveryNeeded = false; + for (Collection logs : walogs) { + for (String log : logs) { + String parts[] = log.split("/"); + String host = parts[0]; + String filename = parts[1]; + if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) { + recoveriesInProgress.remove(filename); + continue; + } + recoveryNeeded = true; + synchronized (recoveriesInProgress) { + if (!recoveriesInProgress.contains(filename)) { + Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference"); + long tid = fate.startTransaction(); + fate.seedTransaction(tid, new RecoverLease(host, filename), true); + recoveriesInProgress.add(filename); + } + } + } + } + return recoveryNeeded; + } + public void run() throws IOException, InterruptedException, KeeperException { final String zroot = ZooUtil.getRoot(instance); @@ -2080,13 +2060,6 @@ public class Master implements LiveTServ TableManager.getInstance().addObserver(this); - recovery = new CoordinateRecoveryTask(fs, getSystemConfiguration()); - Thread recoveryThread = new Daemon(new LoggingRunnable(log, recovery), "Recovery Status"); - recoveryThread.start(); - - loggers = new TabletServerLoggers(this, getSystemConfiguration()); - loggers.scanZooKeeperForUpdates(); - StatusThread statusThread = new StatusThread(); statusThread.start(); @@ -2094,6 +2067,13 @@ public class Master implements LiveTServ migrationCleanupThread.start(); tserverSet.startListeningForTabletServerChanges(); + + ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() { + @Override + public void process(WatchedEvent event) { + nextEvent.event("Noticed recovery changes", event.getType()); + } + }); AuthInfo systemAuths = SecurityConstants.getSystemCredentials(); final TabletStateStore stores[] = {new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(instance, systemAuths, this), @@ -2129,9 +2109,6 @@ public class Master implements LiveTServ final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; statusThread.join(remaining(deadline)); - recovery.stop(); - recoveryThread.join(remaining(deadline)); - // quit, even if the tablet servers somehow jam up and the watchers // don't stop for (TabletGroupWatcher watcher : watchers) { @@ -2199,42 +2176,9 @@ public class Master implements LiveTServ } } - @Override - public void newLogger(String address) { - try { - RemoteLogger remote = new RemoteLogger(address, getSystemConfiguration()); - for (String onDisk : remote.getClosedLogs()) { - Path path = new Path(ServerConstants.getRecoveryDir(), onDisk + ".failed"); - if (fs.exists(path)) { - fs.delete(path, true); - } - } - } catch (Exception ex) { - log.warn("Unexpected error clearing failed recovery markers for new logger", ex); - } - DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS); - obit.delete(address); - nextEvent.event("Added logger %s", address); - } - static final String I_DONT_KNOW_WHY = "unexpected failure"; @Override - public void deadLogger(String address) { - DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS); - InetSocketAddress parseAddress = AddressUtil.parseAddress(address, Property.LOGGER_PORT); - String cause = I_DONT_KNOW_WHY; - for (TServerInstance server : serversToShutdown) { - if (server.getLocation().getHostName().equals(parseAddress.getHostName())) { - cause = "clean shutdown"; - break; - } - } - obit.post(address, cause); - log.info("Noticed logger went away: " + address); - } - - @Override public void update(LiveTServerSet current, Set deleted, Set added) { DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS); if (added.size() > 0) { @@ -2309,10 +2253,6 @@ public class Master implements LiveTServ return tserverSet.getCurrentServers(); } - public Map getLoggers() { - return loggers.getLoggersFromZooKeeper(); - } - @Override public Collection merges() { List result = new ArrayList(); @@ -2322,11 +2262,6 @@ public class Master implements LiveTServ return result; } - @Override - public void finished(LogFile entry) { - nextEvent.event("Log recovery %s complete ", entry); - } - public void killTServer(TServerInstance server) { nextEvent.event("Forcing server down %s", server); serversToShutdown.add(server); @@ -2358,4 +2293,7 @@ public class Master implements LiveTServ return this.fs; } + public void updateRecoveryInProgress(String file) { + recoveriesInProgress.add(file); + } } Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java?rev=1346380&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java Tue Jun 5 13:18:22 2012 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.recovery; + +import java.io.IOException; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.fate.Repo; +import org.apache.accumulo.server.master.Master; +import org.apache.accumulo.server.master.tableOps.MasterRepo; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; + + +public class RecoverLease extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private String server; + private String file; + private long start; + + public RecoverLease(String server, String file) { + this.server = server; + this.file = file; + this.start = System.currentTimeMillis(); + } + + public static Path getSource(Master master, String server, String file) { + String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file; + if (server.contains(":")) { + // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir + source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file; + } + return new Path(source); + } + + public Path getSource(Master master) { + return getSource(master, server, file); + } + + @Override + public long isReady(long tid, Master master) throws Exception { + master.updateRecoveryInProgress(file); + long diff = System.currentTimeMillis() - start; + if (diff < master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY)) + return Math.max(diff, 0); + FileSystem fs = master.getFileSystem(); + if (fs.exists(getSource(master))) + return 0; + log.warn("Unable to locate file " + file + " wal for server " + server); + return 1000; + } + + @Override + public Repo call(long tid, Master master) throws Exception { + Path source = getSource(master); + FileSystem fs = master.getFileSystem(); + if (fs instanceof TraceFileSystem) + fs = ((TraceFileSystem) fs).getImplementation(); + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.recoverLease(source); + log.info("Recovered lease on " + source.toString()); + return new SubmitFileForRecovery(server, file); + } + } catch (IOException ex) { + log.error("error recovering lease ", ex); + } + try { + fs.append(source).close(); + log.info("Recovered lease on " + source.toString() + " using append"); + + } catch (IOException ex) { + log.error("error recovering lease using append", ex); + } + // lets do this again + return new RecoverLease(server, file); + } + +} Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java ------------------------------------------------------------------------------ svn:eol-style = native Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1346380&view=auto ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (added) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Tue Jun 5 13:18:22 2012 @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.recovery; + +import java.io.IOException; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.fate.Repo; +import org.apache.accumulo.server.master.Master; +import org.apache.accumulo.server.master.tableOps.MasterRepo; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * + */ +public class SubmitFileForRecovery extends MasterRepo implements Repo { + + private static final long serialVersionUID = 1L; + String server; + String file; + + SubmitFileForRecovery(String server, String file) { + this.server = server; + this.file = file; + } + + @Override + public Repo call(long tid, final Master master) throws Exception { + master.updateRecoveryInProgress(file); + String source = RecoverLease.getSource(master, server, file).toString(); + ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file; + zoo.putPersistentData(path, source.getBytes(), NodeExistsPolicy.SKIP); + log.info("Created zookeeper entry " + path + " with data " + source); + zoo.exists(path, new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case NodeDataChanged: + log.info("noticed recovery entry for " + file + " was removed"); + FileSystem fs = master.getFileSystem(); + Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()), "finished"); + try { + if (fs.exists(finished)) + log.info("log recovery for " + file + " successful"); + else + log.error("zookeeper recovery entry " + path + " has been removed, but the finish flag file is missing"); + } catch (IOException ex) { + log.error("Unable to check on the recovery status of " + file, ex); + } + break; + } + } + + }); + return null; + } + +} Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Tue Jun 5 13:18:22 2012 @@ -27,10 +27,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService; import org.apache.accumulo.core.Constants; @@ -49,8 +46,8 @@ import org.apache.accumulo.core.file.Fil import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -367,17 +364,8 @@ class LoadFiles extends MasterRepo { synchronized void initializeThreadPool(Master master) { if (threadPool == null) { - int THREAD_POOL_SIZE = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadFactory threadFactory = new ThreadFactory() { - int count = 0; - - @Override - public Thread newThread(Runnable r) { - return new Daemon(r, "bulk loader " + count++); - } - }; - ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue(), - threadFactory); + int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); + ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); pool.allowCoreThreadTimeOut(true); threadPool = new TraceExecutorService(pool); } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java Tue Jun 5 13:18:22 2012 @@ -22,9 +22,9 @@ import org.apache.accumulo.core.util.Add import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.fate.Repo; -import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.EventCoordinator.Listener; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.tableOps.MasterRepo; import org.apache.accumulo.server.zookeeper.IZooReaderWriter; @@ -60,7 +60,7 @@ public class ShutdownTServer extends Mas path = ZooUtil.getRoot(m.getInstance()) + Constants.ZDEADTSERVERS + "/" + tserver; IZooReaderWriter zoo = ZooReaderWriter.getInstance(); zoo.putPersistentData(path, "forced down".getBytes(), NodeExistsPolicy.OVERWRITE); - return new DisconnectLogger(server.getLocation().getAddress().getHostAddress()); + return null; } // TODO move this to isReady() and drop while loop? @@ -86,7 +86,7 @@ public class ShutdownTServer extends Mas listener.waitForEvents(1000); } - return new DisconnectLogger(server.getLocation().getAddress().getHostAddress()); + return null; } @Override Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Tue Jun 5 13:18:22 2012 @@ -56,7 +56,6 @@ import org.apache.accumulo.server.monito import org.apache.accumulo.server.monitor.servlets.GcStatusServlet; import org.apache.accumulo.server.monitor.servlets.JSONServlet; import org.apache.accumulo.server.monitor.servlets.LogServlet; -import org.apache.accumulo.server.monitor.servlets.LoggersServlet; import org.apache.accumulo.server.monitor.servlets.MasterServlet; import org.apache.accumulo.server.monitor.servlets.OperationServlet; import org.apache.accumulo.server.monitor.servlets.ProblemServlet; @@ -311,7 +310,6 @@ public class Monitor { UtilWaitThread.sleep(1000); } if (mmi != null) { - int majorCompactions = 0; int minorCompactions = 0; @@ -320,7 +318,7 @@ public class Monitor { indexCacheRequestTracker.startingUpdates(); dataCacheHitTracker.startingUpdates(); dataCacheRequestTracker.startingUpdates(); - + for (TabletServerStatus server : mmi.tServerInfo) { TableInfo summary = Monitor.summarizeTableStats(server); totalIngestRate += summary.ingestRate; @@ -368,7 +366,6 @@ public class Monitor { ingestRateOverTime.add(new Pair(currentTime, totalIngestRate)); ingestByteRateOverTime.add(new Pair(currentTime, totalIngestByteRate)); - recoveriesOverTime.add(new Pair(currentTime, mmi.recovery.size())); double totalLoad = 0.; for (TabletServerStatus status : mmi.tServerInfo) { @@ -390,7 +387,6 @@ public class Monitor { calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker); calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker); } - try { Monitor.problemSummary = ProblemReports.getInstance().summarize(); Monitor.problemException = null; @@ -480,7 +476,6 @@ public class Monitor { server.addServlet(MasterServlet.class, "/master"); server.addServlet(TablesServlet.class, "/tables"); server.addServlet(TServersServlet.class, "/tservers"); - server.addServlet(LoggersServlet.class, "/loggers"); server.addServlet(ProblemServlet.class, "/problems"); server.addServlet(GcStatusServlet.class, "/gc"); server.addServlet(LogServlet.class, "/log"); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/BasicServlet.java Tue Jun 5 13:18:22 2012 @@ -176,7 +176,6 @@ abstract public class BasicServlet exten sb.append("
\n"); sb.append("Master Server
\n"); sb.append("Tablet Servers
\n"); - sb.append("Logger Servers
\n"); sb.append("Server Activity
\n"); sb.append("Garbage Collector
\n"); sb.append("Tables
\n"); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java Tue Jun 5 13:18:22 2012 @@ -34,7 +34,6 @@ import org.apache.accumulo.core.master.t import org.apache.accumulo.core.master.thrift.MasterState; import org.apache.accumulo.core.master.thrift.RecoveryStatus; import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.monitor.DedupedLogEvent; @@ -43,9 +42,9 @@ import org.apache.accumulo.server.monito import org.apache.accumulo.server.monitor.util.Table; import org.apache.accumulo.server.monitor.util.TableRow; import org.apache.accumulo.server.monitor.util.celltypes.DurationType; -import org.apache.accumulo.server.monitor.util.celltypes.LoggerLinkType; import org.apache.accumulo.server.monitor.util.celltypes.NumberType; import org.apache.accumulo.server.monitor.util.celltypes.ProgressChartType; +import org.apache.accumulo.server.util.AddressUtil; import org.apache.log4j.Level; public class MasterServlet extends BasicServlet { @@ -57,7 +56,7 @@ public class MasterServlet extends Basic @Override protected String getTitle(HttpServletRequest req) { List masters = Monitor.getInstance().getMasterLocations(); - return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), 0).getHostName()); + return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName()); } @Override @@ -137,8 +136,6 @@ public class MasterServlet extends Basic masterStatus.addSortableColumn("# Online
Tablet Servers", new NumberType((int) (slaves.size() * 0.8 + 1.0), slaves.size(), (int) (slaves.size() * 0.6 + 1.0), slaves.size()), "Number of tablet servers currently available"); masterStatus.addSortableColumn("# Total
Tablet Servers", new NumberType(), "The total number of tablet servers configured"); - masterStatus.addSortableColumn("Loggers", new NumberType((int) (slaves.size() * .8), Integer.MAX_VALUE, 1, Integer.MAX_VALUE), - "The number of write-ahead loggers. This should be approximately the same as the number of tablet servers (and greater than zero)."); masterStatus.addSortableColumn("Last GC", null, "The last time files were cleaned-up from HDFS."); masterStatus.addSortableColumn("# Tablets", new NumberType(0, Integer.MAX_VALUE, 2, Integer.MAX_VALUE), null); masterStatus.addSortableColumn("# Unassigned
Tablets", new NumberType(0, 0), null); @@ -153,10 +150,9 @@ public class MasterServlet extends Basic masterStatus.addSortableColumn("OS Load", new NumberType(0., guessHighLoad * 1., 0., guessHighLoad * 3.), "The one-minute load average on the computer that runs the monitor web server."); TableRow row = masterStatus.prepareRow(); - row.add(masters.size() == 0 ? "
Down
" : AddressUtil.parseAddress(masters.get(0), 0).getHostName()); + row.add(masters.size() == 0 ? "
Down
" : AddressUtil.parseAddress(masters.get(0), Property.MASTER_CLIENTPORT).getHostName()); row.add(Monitor.getMmi().tServerInfo.size()); row.add(slaves.size()); - row.add(Monitor.getMmi().loggers.size()); row.add("" + gcStatus + ""); row.add(Monitor.getTotalTabletCount()); row.add(Monitor.getMmi().unassignedTablets); @@ -177,25 +173,28 @@ public class MasterServlet extends Basic private void doRecoveryList(HttpServletRequest req, StringBuilder sb) { MasterMonitorInfo mmi = Monitor.getMmi(); if (mmi != null) { - List jobs = mmi.recovery; - if (jobs != null && jobs.size() > 0) { - Table recoveryTable = new Table("logRecovery", "Log Recovery"); - recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered."); - recoveryTable.addSortableColumn("Server", new LoggerLinkType(), null); - recoveryTable.addSortableColumn("Log"); - recoveryTable.addSortableColumn("Time", new DurationType(), null); - recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null); - - for (RecoveryStatus recovery : jobs) { - TableRow row = recoveryTable.prepareRow(); - row.add(recovery); - row.add(recovery.name); - row.add((long) recovery.runtime); - row.add(recovery.copyProgress); - recoveryTable.addRow(row); + Table recoveryTable = new Table("logRecovery", "Log Recovery"); + recoveryTable.setSubCaption("Some tablets were unloaded in an unsafe manner. Write-ahead logs are being recovered."); + recoveryTable.addSortableColumn("Server"); + recoveryTable.addSortableColumn("Log"); + recoveryTable.addSortableColumn("Time", new DurationType(), null); + recoveryTable.addSortableColumn("Copy/Sort", new ProgressChartType(), null); + int rows = 0; + for (TabletServerStatus server : mmi.tServerInfo) { + if (server.logSorts != null) { + for (RecoveryStatus recovery : server.logSorts) { + TableRow row = recoveryTable.prepareRow(); + row.add(AddressUtil.parseAddress(server.name, Property.TSERV_CLIENTPORT).getHostName()); + row.add(recovery.name); + row.add((long) recovery.runtime); + row.add(recovery.progress); + recoveryTable.addRow(row); + rows++; + } } - recoveryTable.generate(req, sb); } + if (rows > 0) + recoveryTable.generate(req, sb); } } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/OperationServlet.java Tue Jun 5 13:18:22 2012 @@ -167,8 +167,6 @@ public class OperationServlet extends Ba // a dead server should have a uniq address: a logger or tserver DeadServerList obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADTSERVERS); obit.delete(server); - obit = new DeadServerList(ZooUtil.getRoot(inst) + Constants.ZDEADLOGGERS); - obit.delete(server); } } } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/XMLServlet.java Tue Jun 5 13:18:22 2012 @@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.I import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.master.thrift.Compacting; import org.apache.accumulo.core.master.thrift.DeadServer; -import org.apache.accumulo.core.master.thrift.LoggerStatus; import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -80,12 +79,6 @@ public class XMLServlet extends BasicSer sb.append("\n"); sb.append("").append(summary.tablets).append("\n"); - if (status.loggers != null) { - sb.append(""); - for (String logger : status.loggers) - sb.append("" + logger + ""); - sb.append(""); - } sb.append("").append(summary.ingestRate).append("\n"); sb.append("").append(summary.queryRate).append("\n"); @@ -110,12 +103,6 @@ public class XMLServlet extends BasicSer } sb.append("\n\n"); - sb.append("\n\n"); - for (LoggerStatus entry : Monitor.getMmi().loggers) { - sb.append(String.format("\n", entry.logger)); - } - sb.append("\n\n"); - sb.append("\n\n"); for (String server : Monitor.getMmi().serversShuttingDown) { sb.append(String.format("\n", server)); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java Tue Jun 5 13:18:22 2012 @@ -40,11 +40,11 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedKeyIterator; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.util.MetadataTable; -import org.apache.accumulo.server.util.NamingThreadFactory; import org.apache.accumulo.server.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.commons.collections.map.LRUMap; Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1346380&r1=1346379&r2=1346380&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Tue Jun 5 13:18:22 2012 @@ -111,8 +111,8 @@ import org.apache.accumulo.server.tablet import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv; import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation; +import org.apache.accumulo.server.tabletserver.log.DfsLogger; import org.apache.accumulo.server.tabletserver.log.MutationReceiver; -import org.apache.accumulo.server.tabletserver.log.RemoteLogger; import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage; import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics; import org.apache.accumulo.server.trace.TraceFileSystem; @@ -209,7 +209,7 @@ public class Tablet { return Tablet.this; } - public boolean beginUpdatingLogsUsed(ArrayList copy, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(ArrayList copy, boolean mincFinish) { return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish); } @@ -1178,7 +1178,6 @@ public class Tablet { for (int i = 0; i < files.length; i++) { paths[i] = files[i].getPath(); } - log.debug("fs " + fs + " files: " + Arrays.toString(paths) + " location: " + location); Collection goodPaths = cleanUpFiles(fs, files, location, true); for (String path : goodPaths) { String filename = new Path(path).getName(); @@ -1226,17 +1225,6 @@ public class Tablet { return datafiles; } - private static Set getCurrentLoggers(List entries) { - Set result = new HashSet(); - for (LogEntry logEntry : entries) { - for (String log : logEntry.logSet) { - String[] parts = log.split("/", 2); - result.add(new RemoteLogger(parts[0], parts[1], null)); - } - } - return result; - } - private static List lookupLogEntries(KeyExtent ke, SortedMap tabletsKeyValues) { List logEntries = new ArrayList(); @@ -1470,7 +1458,15 @@ public class Tablet { throw new RuntimeException(t); } } - currentLogs = getCurrentLoggers(logEntries); + // make some closed references that represent the recovered logs + currentLogs = new HashSet(); + for (LogEntry logEntry : logEntries) { + for (String log : logEntry.logSet) { + String[] parts = log.split("/", 2); + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1])); + } + } + log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries() + " entries created)"); } @@ -2241,7 +2237,7 @@ public class Tablet { private synchronized MinorCompactionTask prepareForMinC(long flushId) { CommitSession oldCommitSession = tabletMemory.prepareForMinC(); otherLogs = currentLogs; - currentLogs = new HashSet(); + currentLogs = new HashSet(); String mergeFile = datafileManager.reserveMergingMinorCompactionFile(); @@ -3243,12 +3239,8 @@ public class Tablet { log.debug("Starting MajC " + extent + " (" + reason + ") " + datafileManager.abs2rel(datafileManager.string2path(copy.keySet())) + " --> " + datafileManager.abs2rel(new Path(compactTmpName))); - Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, // always - // propagate - // deletes, - // unless - // last - // batch + // always propagate deletes, unless last batch + Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, acuTableConf, extent, cenv, compactionIterators); CompactionStats mcs = compactor.call(); @@ -3650,8 +3642,18 @@ public class Tablet { } } - private Set currentLogs = new HashSet(); + private Set currentLogs = new HashSet(); + public Set getCurrentLogs() { + Set result = new HashSet(); + synchronized (currentLogs) { + for (DfsLogger log : currentLogs) { + result.add(log.toString()); + } + } + return result; + } + private Set beginClearingUnusedLogs() { Set doomed = new HashSet(); @@ -3665,12 +3667,12 @@ public class Tablet { if (removingLogs) throw new IllegalStateException("Attempted to clear logs when removal of logs in progress"); - for (RemoteLogger logger : otherLogs) { + for (DfsLogger logger : otherLogs) { otherLogsCopy.add(logger.toString()); doomed.add(logger.toString()); } - for (RemoteLogger logger : currentLogs) { + for (DfsLogger logger : currentLogs) { currentLogsCopy.add(logger.toString()); doomed.remove(logger.toString()); } @@ -3698,7 +3700,7 @@ public class Tablet { logLock.unlock(); } - private Set otherLogs = Collections.emptySet(); + private Set otherLogs = Collections.emptySet(); private boolean removingLogs = false; // this lock is basically used to synchronize writing of log info to !METADATA @@ -3708,7 +3710,7 @@ public class Tablet { return currentLogs.size(); } - private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection more, boolean mincFinish) { + private boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection more, boolean mincFinish) { boolean releaseLock = true; @@ -3745,7 +3747,7 @@ public class Tablet { int numAdded = 0; int numContained = 0; - for (RemoteLogger logger : more) { + for (DfsLogger logger : more) { if (addToOther) { if (otherLogs.add(logger)) numAdded++;