Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D38A410DDE for ; Fri, 6 Sep 2013 18:22:57 +0000 (UTC) Received: (qmail 62883 invoked by uid 500); 6 Sep 2013 18:22:53 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 62607 invoked by uid 500); 6 Sep 2013 18:22:50 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 60263 invoked by uid 99); 6 Sep 2013 18:22:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Sep 2013 18:22:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C951D902A89; Fri, 6 Sep 2013 18:22:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 06 Sep 2013 18:23:09 -0000 Message-Id: <9dac2031e6874db2b347a8d929b18163@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [41/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java deleted file mode 100644 index 2c6858d..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -public class TabletServerConstants { - - public static final String IntermediateKeyName = "IntermediateKey"; - public static final String ColumnSetName = "ColumnSet"; - public static final String AuthorizationSetName = "AuthorizationSet"; - public static final String EndKeyName = "EndKey"; - public static final String MaxResultsName = "MaxResults"; - public static final String PreviousQueryTypeName = "PreviousQueryType"; - public static final String PreviousQueryStatusName = "PreviousQueryStatus"; -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java deleted file mode 100644 index 78313e7..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java +++ /dev/null @@ -1,804 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.trace.instrument.TraceExecutorService; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.LoggingRunnable; -import org.apache.accumulo.core.util.NamingThreadFactory; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.conf.ServerConfiguration; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; -import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; -import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; -import org.apache.log4j.Logger; - -/** - * ResourceManager is responsible for managing the resources of all tablets within a tablet server. - * - * - * - */ -public class TabletServerResourceManager { - - private ExecutorService minorCompactionThreadPool; - private ExecutorService majorCompactionThreadPool; - private ExecutorService rootMajorCompactionThreadPool; - private ExecutorService defaultMajorCompactionThreadPool; - private ExecutorService splitThreadPool; - private ExecutorService defaultSplitThreadPool; - private ExecutorService defaultMigrationPool; - private ExecutorService migrationPool; - private ExecutorService assignmentPool; - private ExecutorService assignMetaDataPool; - private ExecutorService readAheadThreadPool; - private ExecutorService defaultReadAheadThreadPool; - private Map threadPools = new TreeMap(); - - private HashSet tabletResources; - - private FileManager fileManager; - - private MemoryManager memoryManager; - - private MemoryManagementFramework memMgmt; - - private final LruBlockCache _dCache; - private final LruBlockCache _iCache; - private final ServerConfiguration conf; - - private static final Logger log = Logger.getLogger(TabletServerResourceManager.class); - - private ExecutorService addEs(String name, ExecutorService tp) { - if (threadPools.containsKey(name)) { - throw new IllegalArgumentException("Cannot create two executor services with same name " + name); - } - tp = new TraceExecutorService(tp); - threadPools.put(name, tp); - return tp; - } - - private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) { - ExecutorService result = addEs(name, tp); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - try { - int max = conf.getConfiguration().getCount(maxThreads); - if (tp.getMaximumPoolSize() != max) { - log.info("Changing " + maxThreads.getKey() + " to " + max); - tp.setCorePoolSize(max); - tp.setMaximumPoolSize(max); - } - } catch (Throwable t) { - log.error(t, t); - } - } - - }, 1000, 10 * 1000); - return result; - } - - private ExecutorService createEs(int max, String name) { - return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name))); - } - - private ExecutorService createEs(Property max, String name) { - return createEs(max, name, new LinkedBlockingQueue()); - } - - private ExecutorService createEs(Property max, String name, BlockingQueue queue) { - int maxThreads = conf.getConfiguration().getCount(max); - ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); - return addEs(max, name, tp); - } - - private ExecutorService createEs(int min, int max, int timeout, String name) { - return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamingThreadFactory(name))); - } - - public TabletServerResourceManager(Instance instance, VolumeManager fs) { - this.conf = new ServerConfiguration(instance); - final AccumuloConfiguration acuConf = conf.getConfiguration(); - - long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM); - boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries(); - - long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE); - long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE); - long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE); - - _iCache = new LruBlockCache(iCacheSize, blockSize); - _dCache = new LruBlockCache(dCacheSize, blockSize); - - Runtime runtime = Runtime.getRuntime(); - if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) { - throw new IllegalArgumentException(String.format( - "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize, - runtime.maxMemory())); - } - runtime.gc(); - - // totalMemory - freeMemory = memory in use - // maxMemory - memory in use = max available memory - if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) { - log.warn("In-memory map may not fit into local memory space."); - } - - minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor"); - - // make this thread pool have a priority queue... and execute tablets with the most - // files first! - majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue()); - rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor"); - defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor"); - - splitThreadPool = createEs(1, "splitter"); - defaultSplitThreadPool = createEs(0, 1, 60, "md splitter"); - - defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration"); - migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration"); - - // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because - // individual tablet servers are already running assignments concurrently... having each individual tablet server run - // concurrent assignments would put more load on the metadata table at startup - assignmentPool = createEs(1, "tablet assignment"); - - assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment"); - - readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead"); - defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead"); - - tabletResources = new HashSet(); - - int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES); - - fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache); - - try { - Class clazz = AccumuloVFSClassLoader.loadClass(acuConf.get(Property.TSERV_MEM_MGMT), MemoryManager.class); - memoryManager = clazz.newInstance(); - memoryManager.init(conf); - log.debug("Loaded memory manager : " + memoryManager.getClass().getName()); - } catch (Exception e) { - log.error("Failed to find memory manger in config, using default", e); - } - - if (memoryManager == null) { - memoryManager = new LargestFirstMemoryManager(); - } - - memMgmt = new MemoryManagementFramework(); - } - - private static class TabletStateImpl implements TabletState, Cloneable { - - private long lct; - private Tablet tablet; - private long mts; - private long mcmts; - - public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) { - this.tablet = t; - this.mts = mts; - this.lct = lct; - this.mcmts = mcmts; - } - - public KeyExtent getExtent() { - return tablet.getExtent(); - } - - Tablet getTablet() { - return tablet; - } - - public long getLastCommitTime() { - return lct; - } - - public long getMemTableSize() { - return mts; - } - - public long getMinorCompactingMemTableSize() { - return mcmts; - } - } - - private class MemoryManagementFramework { - private final Map tabletReports; - private LinkedBlockingQueue memUsageReports; - private long lastMemCheckTime = System.currentTimeMillis(); - private long maxMem; - - MemoryManagementFramework() { - tabletReports = Collections.synchronizedMap(new HashMap()); - memUsageReports = new LinkedBlockingQueue(); - maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM); - - Runnable r1 = new Runnable() { - public void run() { - processTabletMemStats(); - } - }; - - Thread t1 = new Daemon(new LoggingRunnable(log, r1)); - t1.setPriority(Thread.NORM_PRIORITY + 1); - t1.setName("Accumulo Memory Guard"); - t1.start(); - - Runnable r2 = new Runnable() { - public void run() { - manageMemory(); - } - }; - - Thread t2 = new Daemon(new LoggingRunnable(log, r2)); - t2.setName("Accumulo Minor Compaction Initiator"); - t2.start(); - - } - - private long lastMemTotal = 0; - - private void processTabletMemStats() { - while (true) { - try { - - TabletStateImpl report = memUsageReports.take(); - - while (report != null) { - tabletReports.put(report.getExtent(), report); - report = memUsageReports.poll(); - } - - long delta = System.currentTimeMillis() - lastMemCheckTime; - if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) { - lastMemCheckTime = System.currentTimeMillis(); - - long totalMemUsed = 0; - - synchronized (tabletReports) { - for (TabletStateImpl tsi : tabletReports.values()) { - totalMemUsed += tsi.getMemTableSize(); - totalMemUsed += tsi.getMinorCompactingMemTableSize(); - } - } - - if (totalMemUsed > 0.95 * maxMem) { - holdAllCommits(true); - } else { - holdAllCommits(false); - } - - lastMemTotal = totalMemUsed; - } - - } catch (InterruptedException e) { - log.warn(e, e); - } - } - } - - private void manageMemory() { - while (true) { - MemoryManagementActions mma = null; - - try { - ArrayList tablets; - synchronized (tabletReports) { - tablets = new ArrayList(tabletReports.values()); - } - mma = memoryManager.getMemoryManagementActions(tablets); - - } catch (Throwable t) { - log.error("Memory manager failed " + t.getMessage(), t); - } - - try { - if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) { - for (KeyExtent keyExtent : mma.tabletsToMinorCompact) { - TabletStateImpl tabletReport = tabletReports.get(keyExtent); - - if (tabletReport == null) { - log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent); - continue; - } - - if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) { - if (tabletReport.getTablet().isClosed()) { - tabletReports.remove(tabletReport.getExtent()); - log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent); - } else { - log.info("Ignoring memory manager recommendation: not minor compacting " + keyExtent); - } - } - } - - // log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact); - } - } catch (Throwable t) { - log.error("Minor compactions for memory managment failed", t); - } - - UtilWaitThread.sleep(250); - } - } - - public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) { - memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize)); - } - - public void tabletClosed(KeyExtent extent) { - tabletReports.remove(extent); - } - } - - private final Object commitHold = new Object(); - private volatile boolean holdCommits = false; - private long holdStartTime; - - protected void holdAllCommits(boolean holdAllCommits) { - synchronized (commitHold) { - if (holdCommits != holdAllCommits) { - holdCommits = holdAllCommits; - - if (holdCommits) { - holdStartTime = System.currentTimeMillis(); - } - - if (!holdCommits) { - log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0)); - commitHold.notifyAll(); - } - } - } - - } - - void waitUntilCommitsAreEnabled() { - if (holdCommits) { - long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); - synchronized (commitHold) { - while (holdCommits) { - try { - if (System.currentTimeMillis() > timeout) - throw new HoldTimeoutException("Commits are held"); - commitHold.wait(1000); - } catch (InterruptedException e) {} - } - } - } - } - - public long holdTime() { - if (!holdCommits) - return 0; - synchronized (commitHold) { - return System.currentTimeMillis() - holdStartTime; - } - } - - public void close() { - for (ExecutorService executorService : threadPools.values()) { - executorService.shutdown(); - } - - for (Entry entry : threadPools.entrySet()) { - while (true) { - try { - if (entry.getValue().awaitTermination(60, TimeUnit.SECONDS)) - break; - log.info("Waiting for thread pool " + entry.getKey() + " to shutdown"); - } catch (InterruptedException e) { - log.warn(e); - } - } - } - } - - public synchronized TabletResourceManager createTabletResourceManager() { - TabletResourceManager trm = new TabletResourceManager(); - return trm; - } - - synchronized private void addTabletResource(TabletResourceManager tr) { - tabletResources.add(tr); - } - - synchronized private void removeTabletResource(TabletResourceManager tr) { - tabletResources.remove(tr); - } - - private class MapFileInfo { - private final FileRef path; - private final long size; - - MapFileInfo(FileRef path, long size) { - this.path = path; - this.size = size; - } - } - - public class TabletResourceManager { - - private final long creationTime = System.currentTimeMillis(); - - private volatile boolean openFilesReserved = false; - - private volatile boolean closed = false; - - private Tablet tablet; - - private AccumuloConfiguration tableConf; - - TabletResourceManager() {} - - void setTablet(Tablet tablet, AccumuloConfiguration tableConf) { - this.tablet = tablet; - this.tableConf = tableConf; - // TabletResourceManager is not really initialized until this - // function is called.... so do not make it publicly available - // until now - - addTabletResource(this); - } - - // BEGIN methods that Tablets call to manage their set of open map files - - public void importedMapFiles() { - lastReportedCommitTime = System.currentTimeMillis(); - } - - synchronized ScanFileManager newScanFileManager() { - if (closed) - throw new IllegalStateException("closed"); - return fileManager.newScanFileManager(tablet.getExtent()); - } - - // END methods that Tablets call to manage their set of open map files - - // BEGIN methods that Tablets call to manage memory - - private AtomicLong lastReportedSize = new AtomicLong(); - private AtomicLong lastReportedMincSize = new AtomicLong(); - private volatile long lastReportedCommitTime = 0; - - public void updateMemoryUsageStats(long size, long mincSize) { - - // do not want to update stats for every little change, - // so only do it under certain circumstances... the reason - // for this is that reporting stats acquires a lock, do - // not want all tablets locking on the same lock for every - // commit - long totalSize = size + mincSize; - long lrs = lastReportedSize.get(); - long delta = totalSize - lrs; - long lrms = lastReportedMincSize.get(); - boolean report = false; - // the atomic longs are considered independently, when one is set - // the other is not set intentionally because this method is not - // synchronized... therefore there are not transactional semantics - // for reading and writing two variables - if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) { - report = true; - } - - long currentTime = System.currentTimeMillis(); - if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) { - if (delta > 0) - lastReportedCommitTime = currentTime; - report = true; - } - - if (report) - memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize); - } - - // END methods that Tablets call to manage memory - - // BEGIN methods that Tablets call to make decisions about major compaction - // when too many files are open, we may want tablets to compact down - // to one map file - Map findMapFilesToCompact(SortedMap tabletFiles, MajorCompactionReason reason) { - if (reason == MajorCompactionReason.USER) { - Map files = new HashMap(); - for (Entry entry : tabletFiles.entrySet()) { - files.put(entry.getKey(), entry.getValue().getSize()); - } - return files; - } - - if (tabletFiles.size() <= 1) - return null; - TreeSet candidateFiles = new TreeSet(new Comparator() { - @Override - public int compare(MapFileInfo o1, MapFileInfo o2) { - if (o1 == o2) - return 0; - if (o1.size < o2.size) - return -1; - if (o1.size > o2.size) - return 1; - return o1.path.compareTo(o2.path); - } - }); - - double ratio = tableConf.getFraction(Property.TABLE_MAJC_RATIO); - int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN); - int maxFilesPerTablet = tableConf.getMaxFilesPerTablet(); - - for (Entry entry : tabletFiles.entrySet()) { - candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); - } - - long totalSize = 0; - for (MapFileInfo mfi : candidateFiles) { - totalSize += mfi.size; - } - - Map files = new HashMap(); - - while (candidateFiles.size() > 1) { - MapFileInfo max = candidateFiles.last(); - if (max.size * ratio <= totalSize) { - files.clear(); - for (MapFileInfo mfi : candidateFiles) { - files.put(mfi.path, mfi.size); - if (files.size() >= maxFilesToCompact) - break; - } - - break; - } - totalSize -= max.size; - candidateFiles.remove(max); - } - - int totalFilesToCompact = 0; - if (tabletFiles.size() > maxFilesPerTablet) - totalFilesToCompact = tabletFiles.size() - maxFilesPerTablet + 1; - - totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact); - - if (files.size() < totalFilesToCompact) { - - TreeMap tfc = new TreeMap(tabletFiles); - tfc.keySet().removeAll(files.keySet()); - - // put data in candidateFiles to sort it - candidateFiles.clear(); - for (Entry entry : tfc.entrySet()) - candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize())); - - for (MapFileInfo mfi : candidateFiles) { - files.put(mfi.path, mfi.size); - if (files.size() >= totalFilesToCompact) - break; - } - } - - if (files.size() == 0) - return null; - - return files; - } - - boolean needsMajorCompaction(SortedMap tabletFiles, MajorCompactionReason reason) { - if (closed) - return false;// throw new IOException("closed"); - - // int threshold; - - if (reason == MajorCompactionReason.USER) - return true; - - if (reason == MajorCompactionReason.IDLE) { - // threshold = 1; - long idleTime; - if (lastReportedCommitTime == 0) { - // no commits, so compute how long the tablet has been assigned to the - // tablet server - idleTime = System.currentTimeMillis() - creationTime; - } else { - idleTime = System.currentTimeMillis() - lastReportedCommitTime; - } - - if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) { - return false; - } - }/* - * else{ threshold = tableConf.getCount(Property.TABLE_MAJC_THRESHOLD); } - */ - - return findMapFilesToCompact(tabletFiles, reason) != null; - } - - // END methods that Tablets call to make decisions about major compaction - - // tablets call this method to run minor compactions, - // this allows us to control how many minor compactions - // run concurrently in a tablet server - void executeMinorCompaction(final Runnable r) { - minorCompactionThreadPool.execute(new LoggingRunnable(log, r)); - } - - void close() throws IOException { - // always obtain locks in same order to avoid deadlock - synchronized (TabletServerResourceManager.this) { - synchronized (this) { - if (closed) - throw new IOException("closed"); - if (openFilesReserved) - throw new IOException("tired to close files while open files reserved"); - - TabletServerResourceManager.this.removeTabletResource(this); - - memMgmt.tabletClosed(tablet.getExtent()); - memoryManager.tabletClosed(tablet.getExtent()); - - closed = true; - } - } - } - - public TabletServerResourceManager getTabletServerResourceManager() { - return TabletServerResourceManager.this; - } - - public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) { - TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask); - } - - } - - public void executeSplit(KeyExtent tablet, Runnable splitTask) { - if (tablet.isMeta()) { - if (tablet.isRootTablet()) { - log.warn("Saw request to split root tablet, ignoring"); - return; - } - defaultSplitThreadPool.execute(splitTask); - } else { - splitThreadPool.execute(splitTask); - } - } - - public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) { - if (tablet.equals(RootTable.EXTENT)) { - rootMajorCompactionThreadPool.execute(compactionTask); - } else if (tablet.isMeta()) { - defaultMajorCompactionThreadPool.execute(compactionTask); - } else { - majorCompactionThreadPool.execute(compactionTask); - } - } - - public void executeReadAhead(KeyExtent tablet, Runnable task) { - if (tablet.isRootTablet()) { - task.run(); - } else if (tablet.isMeta()) { - defaultReadAheadThreadPool.execute(task); - } else { - readAheadThreadPool.execute(task); - } - } - - public void addAssignment(Runnable assignmentHandler) { - assignmentPool.execute(assignmentHandler); - } - - public void addMetaDataAssignment(Runnable assignmentHandler) { - assignMetaDataPool.execute(assignmentHandler); - } - - public void addMigration(KeyExtent tablet, Runnable migrationHandler) { - if (tablet.isRootTablet()) { - migrationHandler.run(); - } else if (tablet.isMeta()) { - defaultMigrationPool.execute(migrationHandler); - } else { - migrationPool.execute(migrationHandler); - } - } - - public void stopSplits() { - splitThreadPool.shutdown(); - defaultSplitThreadPool.shutdown(); - while (true) { - try { - while (!splitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { - log.info("Waiting for metadata split thread pool to stop"); - } - while (!defaultSplitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { - log.info("Waiting for split thread pool to stop"); - } - break; - } catch (InterruptedException ex) { - log.info(ex, ex); - } - } - } - - public void stopNormalAssignments() { - assignmentPool.shutdown(); - while (true) { - try { - while (!assignmentPool.awaitTermination(1, TimeUnit.MINUTES)) { - log.info("Waiting for assignment thread pool to stop"); - } - break; - } catch (InterruptedException ex) { - log.info(ex, ex); - } - } - } - - public void stopMetadataAssignments() { - assignMetaDataPool.shutdown(); - while (true) { - try { - while (!assignMetaDataPool.awaitTermination(1, TimeUnit.MINUTES)) { - log.info("Waiting for metadata assignment thread pool to stop"); - } - break; - } catch (InterruptedException ex) { - log.info(ex, ex); - } - } - } - - public LruBlockCache getIndexCache() { - return _iCache; - } - - public LruBlockCache getDataCache() { - return _dCache; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java deleted file mode 100644 index aeacb8d..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import org.apache.accumulo.core.data.KeyExtent; - -public interface TabletState { - KeyExtent getExtent(); - - long getLastCommitTime(); - - long getMemTableSize(); - - long getMinorCompactingMemTableSize(); -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java deleted file mode 100644 index 89ea6ac..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import org.apache.accumulo.core.tabletserver.thrift.ActionStats; -import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.server.util.ActionStatsUpdator; - -public class TabletStatsKeeper { - - private ActionStats major = new ActionStats(); - private ActionStats minor = new ActionStats(); - private ActionStats split = new ActionStats(); - - public enum Operation { - MAJOR, SPLIT, MINOR - } - - private ActionStats[] map = new ActionStats[] {major, split, minor}; - - public void updateTime(Operation operation, long queued, long start, long count, boolean failed) { - try { - ActionStats data = map[operation.ordinal()]; - if (failed) { - data.fail++; - data.status--; - } else { - double t = (System.currentTimeMillis() - start) / 1000.0; - double q = (start - queued) / 1000.0; - - data.status--; - data.count += count; - data.num++; - data.elapsed += t; - data.queueTime += q; - data.sumDev += t * t; - data.queueSumDev += q * q; - if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0) - resetTimes(); - } - } catch (Exception E) { - resetTimes(); - } - - } - - public void updateTime(Operation operation, long start, long count, boolean failed) { - try { - ActionStats data = map[operation.ordinal()]; - if (failed) { - data.fail++; - data.status--; - } else { - double t = (System.currentTimeMillis() - start) / 1000.0; - - data.status--; - data.num++; - data.elapsed += t; - data.sumDev += t * t; - - if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0) - resetTimes(); - } - } catch (Exception E) { - resetTimes(); - } - - } - - public void saveMinorTimes(TabletStatsKeeper t) { - ActionStatsUpdator.update(minor, t.minor); - } - - public void saveMajorTimes(TabletStatsKeeper t) { - ActionStatsUpdator.update(major, t.major); - } - - public void resetTimes() { - major = new ActionStats(); - split = new ActionStats(); - minor = new ActionStats(); - } - - public void incrementStatusMinor() { - minor.status++; - } - - public void incrementStatusMajor() { - major.status++; - } - - public void incrementStatusSplit() { - split.status++; - } - - public TabletStats getTabletStats() { - return new TabletStats(null, major, minor, split, 0, 0, 0, 0); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java deleted file mode 100644 index b6d63ad..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver; - -import java.io.IOException; - -public class TooManyFilesException extends IOException { - - private static final long serialVersionUID = 1L; - - public TooManyFilesException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java deleted file mode 100644 index 113e256..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.log; - -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; -import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; -import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; -import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; -import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; -import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.StringUtil; -import org.apache.accumulo.master.state.TServerInstance; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.tabletserver.TabletMutations; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -/** - * Wrap a connection to a logger. - * - */ -public class DfsLogger { - // Package private so that LogSorter can find this - static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; - - private static Logger log = Logger.getLogger(DfsLogger.class); - - public static class LogClosedException extends IOException { - private static final long serialVersionUID = 1L; - - public LogClosedException() { - super("LogClosed"); - } - } - - public interface ServerResources { - AccumuloConfiguration getConfiguration(); - - VolumeManager getFileSystem(); - - Set getCurrentTServers(); - } - - private final LinkedBlockingQueue workQueue = new LinkedBlockingQueue(); - - private final Object closeLock = new Object(); - - private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null); - - private static final LogFileValue EMPTY = new LogFileValue(); - - private boolean closed = false; - - private class LogSyncingTask implements Runnable { - - @Override - public void run() { - ArrayList work = new ArrayList(); - while (true) { - work.clear(); - - try { - work.add(workQueue.take()); - } catch (InterruptedException ex) { - continue; - } - workQueue.drainTo(work); - - synchronized (closeLock) { - if (!closed) { - try { - sync.invoke(logFile); - } catch (Exception ex) { - log.warn("Exception syncing " + ex); - for (DfsLogger.LogWork logWork : work) { - logWork.exception = ex; - } - } - } else { - for (DfsLogger.LogWork logWork : work) { - logWork.exception = new LogClosedException(); - } - } - } - - boolean sawClosedMarker = false; - for (DfsLogger.LogWork logWork : work) - if (logWork == CLOSED_MARKER) - sawClosedMarker = true; - else - logWork.latch.countDown(); - - if (sawClosedMarker) { - synchronized (closeLock) { - closeLock.notifyAll(); - } - break; - } - } - } - } - - static class LogWork { - List mutations; - CountDownLatch latch; - volatile Exception exception; - - public LogWork(List mutations, CountDownLatch latch) { - this.mutations = mutations; - this.latch = latch; - } - } - - public static class LoggerOperation { - private final LogWork work; - - public LoggerOperation(LogWork work) { - this.work = work; - } - - public void await() throws IOException { - try { - work.latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - if (work.exception != null) { - if (work.exception instanceof IOException) - throw (IOException) work.exception; - else if (work.exception instanceof RuntimeException) - throw (RuntimeException) work.exception; - else - throw new RuntimeException(work.exception); - } - } - } - - @Override - public boolean equals(Object obj) { - // filename is unique - if (obj == null) - return false; - if (obj instanceof DfsLogger) - return getFileName().equals(((DfsLogger) obj).getFileName()); - return false; - } - - @Override - public int hashCode() { - // filename is unique - return getFileName().hashCode(); - } - - private final ServerResources conf; - private FSDataOutputStream logFile; - private DataOutputStream encryptingLogFile = null; - private Method sync; - private Path logPath; - private String logger; - - public DfsLogger(ServerResources conf) throws IOException { - this.conf = conf; - } - - public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException { - this.conf = conf; - this.logger = logger; - this.logPath = filename; - } - - public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map opts) throws IOException { - FSDataInputStream file = fs.open(path); - try { - byte[] magic = LOG_FILE_HEADER_V2.getBytes(); - byte[] buffer = new byte[magic.length]; - file.readFully(buffer); - if (Arrays.equals(buffer, magic)) { - int count = file.readInt(); - for (int i = 0; i < count; i++) { - String key = file.readUTF(); - String value = file.readUTF(); - opts.put(key, value); - } - } else { - file.seek(0); - return file; - } - return file; - } catch (IOException ex) { - file.seek(0); - return file; - } - } - - public synchronized void open(String address) throws IOException { - String filename = UUID.randomUUID().toString(); - logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); - - log.debug("DfsLogger.open() begin"); - VolumeManager fs = conf.getFileSystem(); - - logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename); - try { - short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); - if (replication == 0) - replication = fs.getDefaultReplication(logPath); - long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE); - if (blockSize == 0) - blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1); - if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) - logFile = fs.createSyncable(logPath, 0, replication, blockSize); - else - logFile = fs.create(logPath, true, 0, replication, blockSize); - - try { - NoSuchMethodException e = null; - try { - // sync: send data to datanodes - sync = logFile.getClass().getMethod("sync"); - } catch (NoSuchMethodException ex) { - e = ex; - } - try { - // hsync: send data to datanodes and sync the data to disk - sync = logFile.getClass().getMethod("hsync"); - e = null; - } catch (NoSuchMethodException ex) {} - if (e != null) - throw new RuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // Initialize the crypto operations. - org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf - .getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); - - // Initialize the log file with a header and the crypto params used to set up this log file. - logFile.write(LOG_FILE_HEADER_V2.getBytes()); - - CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); - - params.setPlaintextOutputStream(logFile); - - // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here, - // so that that crypto module can re-read its own parameters. - - logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); - - - //@SuppressWarnings("deprecation") - //OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile, cryptoOpts); - params = cryptoModule.getEncryptingOutputStream(params); - OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); - - // If the module just kicks back our original stream, then just use it, don't wrap it in - // another data OutputStream. - if (encipheringOutputStream == logFile) { - encryptingLogFile = logFile; - } else { - encryptingLogFile = new DataOutputStream(encipheringOutputStream); - } - - LogFileKey key = new LogFileKey(); - key.event = OPEN; - key.tserverSession = filename; - key.filename = filename; - write(key, EMPTY); - sync.invoke(logFile); - log.debug("Got new write-ahead log: " + this); - } catch (Exception ex) { - if (logFile != null) - logFile.close(); - logFile = null; - throw new IOException(ex); - } - - Thread t = new Daemon(new LogSyncingTask()); - t.setName("Accumulo WALog thread " + toString()); - t.start(); - } - - @Override - public String toString() { - return getLogger() + "/" + getFileName(); - } - - public String getLogger() { - return logger; - } - - public String getFileName() { - return logPath.toString(); - } - - public void close() throws IOException { - - synchronized (closeLock) { - if (closed) - return; - // after closed is set to true, nothing else should be added to the queue - // CLOSED_MARKER should be the last thing on the queue, therefore when the - // background thread sees the marker and exits there should be nothing else - // to process... so nothing should be left waiting for the background - // thread to do work - closed = true; - workQueue.add(CLOSED_MARKER); - while (!workQueue.isEmpty()) - try { - closeLock.wait(); - } catch (InterruptedException e) { - log.info("Interrupted"); - } - } - - if (logFile != null) - try { - logFile.close(); - } catch (IOException ex) { - log.error(ex); - throw new LogClosedException(); - } - } - - public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException { - // write this log to the METADATA table - final LogFileKey key = new LogFileKey(); - key.event = DEFINE_TABLET; - key.seq = seq; - key.tid = tid; - key.tablet = tablet; - try { - write(key, EMPTY); - sync.invoke(logFile); - } catch (Exception ex) { - log.error(ex); - throw new IOException(ex); - } - } - - /** - * @param key - * @param empty2 - * @throws IOException - */ - private synchronized void write(LogFileKey key, LogFileValue value) throws IOException { - key.write(encryptingLogFile); - value.write(encryptingLogFile); - } - - public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException { - return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation)))); - } - - public LoggerOperation logManyTablets(List mutations) throws IOException { - DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1)); - - synchronized (DfsLogger.this) { - try { - for (TabletMutations tabletMutations : mutations) { - LogFileKey key = new LogFileKey(); - key.event = MANY_MUTATIONS; - key.seq = tabletMutations.getSeq(); - key.tid = tabletMutations.getTid(); - LogFileValue value = new LogFileValue(); - value.mutations = tabletMutations.getMutations(); - write(key, value); - } - } catch (Exception e) { - log.error(e, e); - work.exception = e; - } - } - - synchronized (closeLock) { - // use a different lock for close check so that adding to work queue does not need - // to wait on walog I/O operations - - if (closed) - throw new LogClosedException(); - workQueue.add(work); - } - - return new LoggerOperation(work); - } - - public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { - LogFileKey key = new LogFileKey(); - key.event = COMPACTION_FINISH; - key.seq = seq; - key.tid = tid; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } - } - - public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { - LogFileKey key = new LogFileKey(); - key.event = COMPACTION_START; - key.seq = seq; - key.tid = tid; - key.filename = fqfn; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java deleted file mode 100644 index 0a2ba12..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.log; - -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.master.thrift.RecoveryStatus; -import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; -import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.SimpleThreadPool; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; -import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.MapFile; -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; - -/** - * - */ -public class LogSorter { - - private static final Logger log = Logger.getLogger(LogSorter.class); - VolumeManager fs; - AccumuloConfiguration conf; - - private final Map currentWork = Collections.synchronizedMap(new HashMap()); - - class LogProcessor implements Processor { - - private FSDataInputStream input; - private DataInputStream decryptingInput; - private long bytesCopied = -1; - private long sortStart = 0; - private long sortStop = -1; - - @Override - public Processor newProcessor() { - return new LogProcessor(); - } - - @Override - public void process(String child, byte[] data) { - String work = new String(data); - String[] parts = work.split("\\|"); - String src = parts[0]; - String dest = parts[1]; - String sortId = new Path(src).getName(); - log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId); - - synchronized (currentWork) { - if (currentWork.containsKey(sortId)) - return; - currentWork.put(sortId, this); - } - - try { - log.info("Copying " + src + " to " + dest); - sort(sortId, new Path(src), dest); - } finally { - currentWork.remove(sortId); - } - - } - - public void sort(String name, Path srcPath, String destPath) { - - synchronized (this) { - sortStart = System.currentTimeMillis(); - } - - String formerThreadName = Thread.currentThread().getName(); - int part = 0; - try { - - // the following call does not throw an exception if the file/dir does not exist - fs.deleteRecursively(new Path(destPath)); - - FSDataInputStream tmpInput = fs.open(srcPath); - - byte[] magic = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); - byte[] magicBuffer = new byte[magic.length]; - tmpInput.readFully(magicBuffer); - if (!Arrays.equals(magicBuffer, magic)) { - tmpInput.seek(0); - synchronized (this) { - this.input = tmpInput; - this.decryptingInput = tmpInput; - } - } else { - // We read the crypto module class name here because we need to boot strap the class. The class itself will read any - // additional parameters it needs from the underlying stream. - String cryptoModuleClassname = tmpInput.readUTF(); - org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory - .getCryptoModule(cryptoModuleClassname); - - // Create the parameters and set the input stream into those parameters - CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); - params.setEncryptedInputStream(tmpInput); - - // Create the plaintext input stream from the encrypted one - params = cryptoModule.getDecryptingInputStream(params); - - // Store the plaintext input stream into member variables - synchronized (this) { - this.input = tmpInput; - - if (params.getPlaintextInputStream() instanceof DataInputStream) { - this.decryptingInput = (DataInputStream)params.getPlaintextInputStream(); - } else { - this.decryptingInput = new DataInputStream(params.getPlaintextInputStream()); - } - - } - - } - - - final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); - Thread.currentThread().setName("Sorting " + name + " for recovery"); - while (true) { - final ArrayList> buffer = new ArrayList>(); - try { - long start = input.getPos(); - while (input.getPos() - start < bufferSize) { - LogFileKey key = new LogFileKey(); - LogFileValue value = new LogFileValue(); - key.readFields(decryptingInput); - value.readFields(decryptingInput); - buffer.add(new Pair(key, value)); - } - writeBuffer(destPath, buffer, part++); - buffer.clear(); - } catch (EOFException ex) { - writeBuffer(destPath, buffer, part++); - break; - } - } - fs.create(new Path(destPath, "finished")).close(); - log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms"); - } catch (Throwable t) { - try { - // parent dir may not exist - fs.mkdirs(new Path(destPath)); - fs.create(new Path(destPath, "failed")).close(); - } catch (IOException e) { - log.error("Error creating failed flag file " + name, e); - } - log.error(t, t); - } finally { - Thread.currentThread().setName(formerThreadName); - try { - close(); - } catch (Exception e) { - log.error("Error during cleanup sort/copy " + name, e); - } - synchronized (this) { - sortStop = System.currentTimeMillis(); - } - } - } - - private void writeBuffer(String destPath, ArrayList> buffer, int part) throws IOException { - Path path = new Path(destPath, String.format("part-r-%05d", part++)); - FileSystem ns = fs.getFileSystemByPath(path); - MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class); - try { - Collections.sort(buffer, new Comparator>() { - @Override - public int compare(Pair o1, Pair o2) { - return o1.getFirst().compareTo(o2.getFirst()); - } - }); - for (Pair entry : buffer) { - output.append(entry.getFirst(), entry.getSecond()); - } - } finally { - output.close(); - } - } - - synchronized void close() throws IOException { - bytesCopied = input.getPos(); - input.close(); - decryptingInput.close(); - input = null; - } - - public synchronized long getSortTime() { - if (sortStart > 0) { - if (sortStop > 0) - return sortStop - sortStart; - return System.currentTimeMillis() - sortStart; - } - return 0; - } - - synchronized long getBytesCopied() throws IOException { - return input == null ? bytesCopied : input.getPos(); - } - } - - ThreadPoolExecutor threadPool; - private final Instance instance; - - public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) { - this.instance = instance; - this.fs = fs; - this.conf = conf; - int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); - this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName()); - } - - public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException { - this.threadPool = distWorkQThreadPool; - new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool); - } - - public List getLogSorts() { - List result = new ArrayList(); - synchronized (currentWork) { - for (Entry entries : currentWork.entrySet()) { - RecoveryStatus status = new RecoveryStatus(); - status.name = entries.getKey(); - try { - status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE)); - } catch (IOException ex) { - log.warn("Error getting bytes read"); - } - status.runtime = (int) entries.getValue().getSortTime(); - result.add(status); - } - return result; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java deleted file mode 100644 index 77ddc66..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.log; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.commons.collections.buffer.PriorityBuffer; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.MapFile.Reader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Provide simple Map.Reader methods over multiple Maps. - * - * Presently only supports next() and seek() and works on all the Map directories within a directory. The primary purpose of this class is to merge the results - * of multiple Reduce jobs that result in Map output files. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class MultiReader { - - /** - * Group together the next key/value from a Reader with the Reader - * - */ - private static class Index implements Comparable { - Reader reader; - WritableComparable key; - Writable value; - boolean cached = false; - - private static Object create(java.lang.Class klass) { - try { - return klass.getConstructor().newInstance(); - } catch (Throwable t) { - throw new RuntimeException("Unable to construct objects to use for comparison"); - } - } - - public Index(Reader reader) { - this.reader = reader; - key = (WritableComparable) create(reader.getKeyClass()); - value = (Writable) create(reader.getValueClass()); - } - - private void cache() throws IOException { - if (!cached && reader.next(key, value)) { - cached = true; - } - } - - public int compareTo(Index o) { - try { - cache(); - o.cache(); - // no more data: always goes to the end - if (!cached) - return 1; - if (!o.cached) - return -1; - return key.compareTo(o.key); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - } - - private PriorityBuffer heap = new PriorityBuffer(); - - public MultiReader(VolumeManager fs, Path directory) throws IOException { - boolean foundFinish = false; - for (FileStatus child : fs.listStatus(directory)) { - if (child.getPath().getName().startsWith("_")) - continue; - if (child.getPath().getName().equals("finished")) { - foundFinish = true; - continue; - } - FileSystem ns = fs.getFileSystemByPath(child.getPath()); - heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf()))); - } - if (!foundFinish) - throw new IOException("Sort \"finished\" flag not found in " + directory); - } - - private static void copy(Writable src, Writable dest) throws IOException { - // not exactly efficient... - DataOutputBuffer output = new DataOutputBuffer(); - src.write(output); - DataInputBuffer input = new DataInputBuffer(); - input.reset(output.getData(), output.getLength()); - dest.readFields(input); - } - - public synchronized boolean next(WritableComparable key, Writable val) throws IOException { - Index elt = (Index) heap.remove(); - try { - elt.cache(); - if (elt.cached) { - copy(elt.key, key); - copy(elt.value, val); - elt.cached = false; - } else { - return false; - } - } finally { - heap.add(elt); - } - return true; - } - - public synchronized boolean seek(WritableComparable key) throws IOException { - PriorityBuffer reheap = new PriorityBuffer(heap.size()); - boolean result = false; - for (Object obj : heap) { - Index index = (Index) obj; - try { - WritableComparable found = index.reader.getClosest(key, index.value, true); - if (found != null && found.equals(key)) { - result = true; - } - } catch (EOFException ex) { - // thrown if key is beyond all data in the map - } - index.cached = false; - reheap.add(index); - } - heap = reheap; - return result; - } - - public void close() throws IOException { - IOException problem = null; - for (Object obj : heap) { - Index index = (Index) obj; - try { - index.reader.close(); - } catch (IOException ex) { - problem = ex; - } - } - if (problem != null) - throw problem; - heap = null; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java deleted file mode 100644 index c455397..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.log; - -import org.apache.accumulo.core.data.Mutation; - -public interface MutationReceiver { - void receive(Mutation m); -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java deleted file mode 100644 index 789fe31..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.tabletserver.log; - -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; -import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; -import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; -import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; -import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION; -import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; - -import java.io.IOException; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -/** - * Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet. - * - */ -public class SortedLogRecovery { - private static final Logger log = Logger.getLogger(SortedLogRecovery.class); - - static class EmptyMapFileException extends Exception { - private static final long serialVersionUID = 1L; - - public EmptyMapFileException() { super(); } - } - - static class UnusedException extends Exception { - private static final long serialVersionUID = 1L; - - public UnusedException() { super(); } - } - - private VolumeManager fs; - - public SortedLogRecovery(VolumeManager fs) { - this.fs = fs; - } - - private enum Status { - INITIAL, LOOKING_FOR_FINISH, COMPLETE - }; - - private static class LastStartToFinish { - long lastStart = -1; - long seq = -1; - long lastFinish = -1; - Status compactionStatus = Status.INITIAL; - String tserverSession = ""; - - private void update(long newFinish) { - this.seq = this.lastStart; - if (newFinish != -1) - lastFinish = newFinish; - } - - private void update(int newStartFile, long newStart) { - this.lastStart = newStart; - } - - private void update(String newSession) { - this.lastStart = -1; - this.lastFinish = -1; - this.compactionStatus = Status.INITIAL; - this.tserverSession = newSession; - } - } - - public void recover(KeyExtent extent, List recoveryLogs, Set tabletFiles, MutationReceiver mr) throws IOException { - int[] tids = new int[recoveryLogs.size()]; - LastStartToFinish lastStartToFinish = new LastStartToFinish(); - for (int i = 0; i < recoveryLogs.size(); i++) { - Path logfile = recoveryLogs.get(i); - log.info("Looking at mutations from " + logfile + " for " + extent); - MultiReader reader = new MultiReader(fs, logfile); - try { - try { - tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish); - } catch (EmptyMapFileException ex) { - log.info("Ignoring empty map file " + logfile); - tids[i] = -1; - } catch (UnusedException ex) { - log.info("Ignoring log file " + logfile + " appears to be unused by " + extent); - tids[i] = -1; - } - } finally { - try { - reader.close(); - } catch (IOException ex) { - log.warn("Ignoring error closing file"); - } - } - - } - - if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH) - throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction"); - - for (int i = 0; i < recoveryLogs.size(); i++) { - Path logfile = recoveryLogs.get(i); - MultiReader reader = new MultiReader(fs, logfile); - try { - playbackMutations(reader, tids[i], lastStartToFinish, mr); - } finally { - try { - reader.close(); - } catch (IOException ex) { - log.warn("Ignoring error closing file"); - } - } - log.info("Recovery complete for " + extent + " using " + logfile); - } - } - - int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException { - // Scan for tableId for this extent (should always be in the log) - LogFileKey key = new LogFileKey(); - LogFileValue value = new LogFileValue(); - int tid = -1; - if (!reader.next(key, value)) - throw new EmptyMapFileException(); - if (key.event != OPEN) - throw new RuntimeException("First log entry value is not OPEN"); - - if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) { - if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH) - throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) is not followed by a successful minor compaction."); - lastStartToFinish.update(key.tserverSession); - } - - LogFileKey defineKey = null; - - // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id - // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to - while (reader.next(key, value)) { - // LogReader.printEntry(entry); - if (key.event != DEFINE_TABLET) - break; - if (key.tablet.equals(extent)) { - if (tid != key.tid) { - tid = key.tid; - defineKey = key; - key = new LogFileKey(); - } - } - } - if (tid < 0) { - throw new UnusedException(); - } - - log.debug("Found tid, seq " + tid + " " + defineKey.seq); - - // Scan start/stop events for this tablet - key = defineKey; - key.event = COMPACTION_START; - reader.seek(key); - while (reader.next(key, value)) { - // LogFileEntry.printEntry(entry); - if (key.tid != tid) - break; - if (key.event == COMPACTION_START) { - if (lastStartToFinish.compactionStatus == Status.INITIAL) - lastStartToFinish.compactionStatus = Status.COMPLETE; - if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); - lastStartToFinish.update(fileno, key.seq); - - // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. - log.error("filename in compaction start " + key.filename); - if (tabletFiles.contains(key.filename)) - lastStartToFinish.update(-1); - } else if (key.event == COMPACTION_FINISH) { - if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); - if (lastStartToFinish.compactionStatus == Status.INITIAL) - lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH; - else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart) - throw new RuntimeException("COMPACTION_FINISH does not have preceding COMPACTION_START event."); - else - lastStartToFinish.compactionStatus = Status.COMPLETE; - lastStartToFinish.update(key.seq); - } else - break; - } - return tid; - } - - private void playbackMutations(MultiReader reader, int tid, LastStartToFinish lastStartToFinish, MutationReceiver mr) throws IOException { - LogFileKey key = new LogFileKey(); - LogFileValue value = new LogFileValue(); - - // Playback mutations after the last stop to finish - log.info("Scanning for mutations starting at sequence number " + lastStartToFinish.seq + " for tid " + tid); - key.event = MUTATION; - key.tid = tid; - // the seq number for the minor compaction start is now the same as the - // last update made to memory. Scan up to that mutation, but not past it. - key.seq = lastStartToFinish.seq; - reader.seek(key); - while (true) { - if (!reader.next(key, value)) - break; - if (key.tid != tid) - break; - // log.info("Replaying " + key); - // log.info(value); - if (key.event == MUTATION) { - mr.receive(value.mutations.get(0)); - } else if (key.event == MANY_MUTATIONS) { - for (Mutation m : value.mutations) { - mr.receive(m); - } - } else { - throw new RuntimeException("unexpected log key type: " + key.event); - } - } - } -}