Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B74711E07 for ; Fri, 12 Sep 2014 07:46:31 +0000 (UTC) Received: (qmail 46766 invoked by uid 500); 12 Sep 2014 07:46:31 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 46671 invoked by uid 500); 12 Sep 2014 07:46:31 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 46552 invoked by uid 99); 12 Sep 2014 07:46:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Sep 2014 07:46:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8ADE69BFCB7; Fri, 12 Sep 2014 07:46:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Fri, 12 Sep 2014 07:46:38 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/10] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d54e0fd8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d54e0fd8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d54e0fd8 Branch: refs/heads/1.6.1-SNAPSHOT Commit: d54e0fd8636405b39a982a6fad5a3fca1593d6cf Parents: 0d76cd5 7699e1f Author: Josh Elser Authored: Thu Sep 11 17:42:01 2014 -0700 Committer: Josh Elser Committed: Thu Sep 11 17:42:01 2014 -0700 ---------------------------------------------------------------------- .../system/SourceSwitchingIterator.java | 20 ++++------ .../system/SourceSwitchingIteratorTest.java | 38 +++++++++++++++++- .../apache/accumulo/tserver/FileManager.java | 13 +++++++ .../apache/accumulo/tserver/InMemoryMap.java | 21 +++++++--- .../org/apache/accumulo/tserver/Tablet.java | 5 +++ .../accumulo/tserver/InMemoryMapTest.java | 41 ++++++++++++++++---- 6 files changed, 112 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java index 8bf2517,0000000..b82b9cc mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java @@@ -1,562 -1,0 +1,575 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Semaphore; ++import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iterators.system.TimeSettingIterator; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReportingIterator; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class FileManager { + + private static final Logger log = Logger.getLogger(FileManager.class); + + int maxOpen; + + private static class OpenReader implements Comparable { + long releaseTime; + FileSKVIterator reader; + String fileName; + + public OpenReader(String fileName, FileSKVIterator reader) { + this.fileName = fileName; + this.reader = reader; + this.releaseTime = System.currentTimeMillis(); + } + + @Override + public int compareTo(OpenReader o) { + if (releaseTime < o.releaseTime) { + return -1; + } else if (releaseTime > o.releaseTime) { + return 1; + } else { + return 0; + } + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof OpenReader) { + return compareTo((OpenReader) obj) == 0; + } + return false; + } + + @Override + public int hashCode() { + return fileName.hashCode(); + } + } + + private Map> openFiles; + private HashMap reservedReaders; + + private Semaphore filePermits; + + private VolumeManager fs; + + // the data cache and index cache are allocated in + // TabletResourceManager and passed through the file opener to + // CachableBlockFile which can handle the caches being + // null if unallocated + private BlockCache dataCache = null; + private BlockCache indexCache = null; + + private long maxIdleTime; + + private final ServerConfiguration conf; + + private class IdleFileCloser implements Runnable { + + @Override + public void run() { + + long curTime = System.currentTimeMillis(); + + ArrayList filesToClose = new ArrayList(); + + // determine which files to close in a sync block, and then close the + // files outside of the sync block + synchronized (FileManager.this) { + Iterator>> iter = openFiles.entrySet().iterator(); + while (iter.hasNext()) { + Entry> entry = iter.next(); + List ofl = entry.getValue(); + + for (Iterator oflIter = ofl.iterator(); oflIter.hasNext();) { + OpenReader openReader = oflIter.next(); + + if (curTime - openReader.releaseTime > maxIdleTime) { + + filesToClose.add(openReader.reader); + oflIter.remove(); + } + } + + if (ofl.size() == 0) { + iter.remove(); + } + } + } + + closeReaders(filesToClose); + + } + + } + + /** + * + * @param dataCache + * : underlying file can and should be able to handle a null cache + * @param indexCache + * : underlying file can and should be able to handle a null cache + */ + FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) { + + if (maxOpen <= 0) + throw new IllegalArgumentException("maxOpen <= 0"); + this.conf = conf; + this.dataCache = dataCache; + this.indexCache = indexCache; + + this.filePermits = new Semaphore(maxOpen, true); + this.maxOpen = maxOpen; + this.fs = fs; + + this.openFiles = new HashMap>(); + this.reservedReaders = new HashMap(); + + this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE); + SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2); + + } + + private static int countReaders(Map> files) { + int count = 0; + + for (List list : files.values()) { + count += list.size(); + } + + return count; + } + + private List takeLRUOpenFiles(int numToTake) { + + ArrayList openReaders = new ArrayList(); + + for (Entry> entry : openFiles.entrySet()) { + openReaders.addAll(entry.getValue()); + } + + Collections.sort(openReaders); + + ArrayList ret = new ArrayList(); + + for (int i = 0; i < numToTake; i++) { + OpenReader or = openReaders.get(i); + + List ofl = openFiles.get(or.fileName); + if (!ofl.remove(or)) { + throw new RuntimeException("Failed to remove open reader that should have been there"); + } + + if (ofl.size() == 0) { + openFiles.remove(or.fileName); + } + + ret.add(or.reader); + } + + return ret; + } + + private static List getFileList(String file, Map> files) { + List ofl = files.get(file); + if (ofl == null) { + ofl = new ArrayList(); + files.put(file, ofl); + } + + return ofl; + } + + private void closeReaders(List filesToClose) { + for (FileSKVIterator reader : filesToClose) { + try { + reader.close(); + } catch (Exception e) { + log.error("Failed to close file " + e.getMessage(), e); + } + } + } + + private List takeOpenFiles(Collection files, List reservedFiles, Map readersReserved) { + List filesToOpen = new LinkedList(files); + for (Iterator iterator = filesToOpen.iterator(); iterator.hasNext();) { + String file = iterator.next(); + + List ofl = openFiles.get(file); + if (ofl != null && ofl.size() > 0) { + OpenReader openReader = ofl.remove(ofl.size() - 1); + reservedFiles.add(openReader.reader); + readersReserved.put(openReader.reader, file); + if (ofl.size() == 0) { + openFiles.remove(file); + } + iterator.remove(); + } + + } + return filesToOpen; + } + + private synchronized String getReservedReadeFilename(FileSKVIterator reader) { + return reservedReaders.get(reader); + } + + private List reserveReaders(Text table, Collection files, boolean continueOnFailure) throws IOException { + + if (files.size() >= maxOpen) { + throw new IllegalArgumentException("requested files exceeds max open"); + } + + if (files.size() == 0) { + return Collections.emptyList(); + } + + List filesToOpen = null; + List filesToClose = Collections.emptyList(); + List reservedFiles = new ArrayList(); + Map readersReserved = new HashMap(); + + filePermits.acquireUninterruptibly(files.size()); + + // now that the we are past the semaphore, we have the authority + // to open files.size() files + + // determine what work needs to be done in sync block + // but do the work of opening and closing files outside + // a synch block + synchronized (this) { + + filesToOpen = takeOpenFiles(files, reservedFiles, readersReserved); + + int numOpen = countReaders(openFiles); + + if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) { + filesToClose = takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen); + } + } + + // close files before opening files to ensure we stay under resource + // limitations + closeReaders(filesToClose); + + // open any files that need to be opened + for (String file : filesToOpen) { + try { + if (!file.contains(":")) + throw new IllegalArgumentException("Expected uri, got : " + file); + Path path = new Path(file); + FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); + //log.debug("Opening "+file + " path " + path); + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()), + dataCache, indexCache); + reservedFiles.add(reader); + readersReserved.put(reader, file); + } catch (Exception e) { + + ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e)); + + if (continueOnFailure) { + // release the permit for the file that failed to open + filePermits.release(1); + log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing..."); + } else { + // close whatever files were opened + closeReaders(reservedFiles); + + filePermits.release(files.size()); + + log.error("Failed to open file " + file + " " + e.getMessage()); + throw new IOException("Failed to open " + file, e); + } + } + } + + synchronized (this) { + // update set of reserved readers + reservedReaders.putAll(readersReserved); + } + + return reservedFiles; + } + + private void releaseReaders(List readers, boolean sawIOException) { + // put files in openFiles + + synchronized (this) { + + // check that readers were actually reserved ... want to make sure a thread does + // not try to release readers they never reserved + if (!reservedReaders.keySet().containsAll(readers)) { + throw new IllegalArgumentException("Asked to release readers that were never reserved "); + } + + for (FileSKVIterator reader : readers) { + try { + reader.closeDeepCopies(); + } catch (IOException e) { + log.warn(e, e); + sawIOException = true; + } + } + + for (FileSKVIterator reader : readers) { + String fileName = reservedReaders.remove(reader); + if (!sawIOException) + getFileList(fileName, openFiles).add(new OpenReader(fileName, reader)); + } + } + + if (sawIOException) + closeReaders(readers); + + // decrement the semaphore + filePermits.release(readers.size()); + + } + + static class FileDataSource implements DataSource { + + private SortedKeyValueIterator iter; + private ArrayList deepCopies; + private boolean current = true; + private IteratorEnvironment env; + private String file; ++ private AtomicBoolean iflag; + + FileDataSource(String file, SortedKeyValueIterator iter) { + this.file = file; + this.iter = iter; + this.deepCopies = new ArrayList(); + } + + public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator deepCopy, ArrayList deepCopies) { + this.iter = deepCopy; + this.env = env; + this.deepCopies = deepCopies; + deepCopies.add(this); + } + + @Override + public boolean isCurrent() { + return current; + } + + @Override + public DataSource getNewDataSource() { + current = true; + return this; + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + return new FileDataSource(env, iter.deepCopy(env), deepCopies); + } + + @Override + public SortedKeyValueIterator iterator() throws IOException { ++ if (iflag != null) ++ ((InterruptibleIterator) this.iter).setInterruptFlag(iflag); + return iter; + } + + void unsetIterator() { + current = false; + iter = null; + for (FileDataSource fds : deepCopies) { + fds.current = false; + fds.iter = null; + } + } + + void setIterator(SortedKeyValueIterator iter) { + current = false; + this.iter = iter; ++ ++ if (iflag != null) ++ ((InterruptibleIterator) this.iter).setInterruptFlag(iflag); ++ + for (FileDataSource fds : deepCopies) { + fds.current = false; + fds.iter = iter.deepCopy(fds.env); + } + } ++ ++ @Override ++ public void setInterruptFlag(AtomicBoolean flag) { ++ this.iflag = flag; ++ } + + } + + public class ScanFileManager { + + private ArrayList dataSources; + private ArrayList tabletReservedReaders; + private KeyExtent tablet; + private boolean continueOnFailure; + + ScanFileManager(KeyExtent tablet) { + tabletReservedReaders = new ArrayList(); + dataSources = new ArrayList(); + this.tablet = tablet; + + continueOnFailure = conf.getTableConfiguration(tablet).getBoolean(Property.TABLE_FAILURES_IGNORE); + + if (tablet.isMeta()) { + continueOnFailure = false; + } + } + + private List openFileRefs(Collection files) throws TooManyFilesException, IOException { + List strings = new ArrayList(files.size()); + for (FileRef ref : files) + strings.add(ref.path().toString()); + return openFiles(strings); + } + + private List openFiles(Collection files) throws TooManyFilesException, IOException { + // one tablet can not open more than maxOpen files, otherwise it could get stuck + // forever waiting on itself to release files + + if (tabletReservedReaders.size() + files.size() >= maxOpen) { + throw new TooManyFilesException("Request to open files would exceed max open files reservedReaders.size()=" + tabletReservedReaders.size() + + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet); + } + + List newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure); + + tabletReservedReaders.addAll(newlyReservedReaders); + return newlyReservedReaders; + } + + synchronized List openFiles(Map files, boolean detachable) throws IOException { + + List newlyReservedReaders = openFileRefs(files.keySet()); + + ArrayList iters = new ArrayList(); + + for (FileSKVIterator reader : newlyReservedReaders) { + String filename = getReservedReadeFilename(reader); + InterruptibleIterator iter; + if (detachable) { + FileDataSource fds = new FileDataSource(filename, reader); + dataSources.add(fds); + SourceSwitchingIterator ssi = new SourceSwitchingIterator(fds); + iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, ssi); + } else { + iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader); + } + DataFileValue value = files.get(new FileRef(filename)); + if (value.isTimeSet()) { + iter = new TimeSettingIterator(iter, value.getTime()); + } + + iters.add(iter); + } + + return iters; + } + + synchronized void detach() { + + releaseReaders(tabletReservedReaders, false); + tabletReservedReaders.clear(); + + for (FileDataSource fds : dataSources) + fds.unsetIterator(); + } + + synchronized void reattach() throws IOException { + if (tabletReservedReaders.size() != 0) + throw new IllegalStateException(); + + Collection files = new ArrayList(); + for (FileDataSource fds : dataSources) + files.add(fds.file); + + List newlyReservedReaders = openFiles(files); + Map> map = new HashMap>(); + for (FileSKVIterator reader : newlyReservedReaders) { + String fileName = getReservedReadeFilename(reader); + List list = map.get(fileName); + if (list == null) { + list = new LinkedList(); + map.put(fileName, list); + } + + list.add(reader); + } + + for (FileDataSource fds : dataSources) { + FileSKVIterator reader = map.get(fds.file).remove(0); + fds.setIterator(reader); + } + } + + synchronized void releaseOpenFiles(boolean sawIOException) { + releaseReaders(tabletReservedReaders, sawIOException); + tabletReservedReaders.clear(); + dataSources.clear(); + } + + synchronized int getNumOpenFiles() { + return tabletReservedReaders.size(); + } + } + + public ScanFileManager newScanFileManager(KeyExtent tablet) { + return new ScanFileManager(tablet); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 5f6d9ce,0000000..2e15767 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@@ -1,772 -1,0 +1,783 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; +import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +class MemKeyComparator implements Comparator, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(Key k1, Key k2) { + int cmp = k1.compareTo(k2); + + if (cmp == 0) { + if (k1 instanceof MemKey) + if (k2 instanceof MemKey) + cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount; + else + cmp = 1; + else if (k2 instanceof MemKey) + cmp = -1; + } + + return cmp; + } +} + +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { + + int kvCount; + + public PartialMutationSkippingIterator(SortedKeyValueIterator source, int maxKVCount) { + setSource(source); + this.kvCount = maxKVCount; + } + + @Override + protected void consume() throws IOException { + while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) + getSource().next(); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} + +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator { + MemKey currKey = null; + Value currVal = null; + + public MemKeyConversionIterator(SortedKeyValueIterator source) { + super(); + setSource(source); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new MemKeyConversionIterator(getSource().deepCopy(env)); + } + + @Override + public Key getTopKey() { + return currKey; + } + + @Override + public Value getTopValue() { + return currVal; + } + + private void getTopKeyVal() { + Key k = super.getTopKey(); + Value v = super.getTopValue(); + if (k instanceof MemKey || k == null) { + currKey = (MemKey) k; + currVal = v; + return; + } + currVal = new Value(v); + int mc = MemValue.splitKVCount(currVal); + currKey = new MemKey(k, mc); + + } + + public void next() throws IOException { + super.next(); + if (hasTop()) + getTopKeyVal(); + } + + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + + if (hasTop()) + getTopKeyVal(); + + Key k = range.getStartKey(); + if (k instanceof MemKey && hasTop()) { + while (hasTop() && currKey.compareTo(k) < 0) + next(); + } + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} + +public class InMemoryMap { + private SimpleMap map = null; + + private static final Logger log = Logger.getLogger(InMemoryMap.class); + + private volatile String memDumpFile = null; + private final String memDumpDir; + + private Map> lggroups; + + public InMemoryMap(boolean useNativeMap, String memDumpDir) { + this(new HashMap>(), useNativeMap, memDumpDir); + } + + public InMemoryMap(Map> lggroups, boolean useNativeMap, String memDumpDir) { + this.memDumpDir = memDumpDir; + this.lggroups = lggroups; + + if (lggroups.size() == 0) + map = newMap(useNativeMap); + else + map = new LocalityGroupMap(lggroups, useNativeMap); + } + + public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError { + this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR)); + } + + private static SimpleMap newMap(boolean useNativeMap) { + if (useNativeMap && NativeMap.isLoaded()) { + try { + return new NativeMapWrapper(); + } catch (Throwable t) { + log.error("Failed to create native map", t); + } + } + + return new DefaultMap(); + } + + private interface SimpleMap { + Value get(Key key); + + Iterator> iterator(Key startKey); + + int size(); + + InterruptibleIterator skvIterator(); + + void delete(); + + long getMemoryUsed(); + + void mutate(List mutations, int kvCount); + } + + private static class LocalityGroupMap implements SimpleMap { + + private Map groupFams[]; + + // the last map in the array is the default locality group + private SimpleMap maps[]; + private Partitioner partitioner; + private List[] partitioned; + private Set nonDefaultColumnFamilies; + + @SuppressWarnings("unchecked") + LocalityGroupMap(Map> groups, boolean useNativeMap) { + this.groupFams = new Map[groups.size()]; + this.maps = new SimpleMap[groups.size() + 1]; + this.partitioned = new List[groups.size() + 1]; + this.nonDefaultColumnFamilies = new HashSet(); + + for (int i = 0; i < maps.length; i++) { + maps[i] = newMap(useNativeMap); + } + + int count = 0; + for (Set cfset : groups.values()) { + HashMap map = new HashMap(); + for (ByteSequence bs : cfset) + map.put(bs, new MutableLong(1)); + this.groupFams[count++] = map; + nonDefaultColumnFamilies.addAll(cfset); + } + + partitioner = new LocalityGroupUtil.Partitioner(this.groupFams); + + for (int i = 0; i < partitioned.length; i++) { + partitioned[i] = new ArrayList(); + } + } + + @Override + public Value get(Key key) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator> iterator(Key startKey) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + int sum = 0; + for (SimpleMap map : maps) + sum += map.size(); + return sum; + } + + @Override + public InterruptibleIterator skvIterator() { + LocalityGroup groups[] = new LocalityGroup[maps.length]; + for (int i = 0; i < groups.length; i++) { + if (i < groupFams.length) + groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false); + else + groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true); + } + + + return new LocalityGroupIterator(groups, nonDefaultColumnFamilies); + } + + @Override + public void delete() { + for (SimpleMap map : maps) + map.delete(); + } + + @Override + public long getMemoryUsed() { + long sum = 0; + for (SimpleMap map : maps) + sum += map.getMemoryUsed(); + return sum; + } + + @Override + public synchronized void mutate(List mutations, int kvCount) { + // this method is synchronized because it reuses objects to avoid allocation, + // currently, the method that calls this is synchronized so there is no + // loss in parallelism.... synchronization was added here for future proofing + + try{ + partitioner.partition(mutations, partitioned); + + for (int i = 0; i < partitioned.length; i++) { + if (partitioned[i].size() > 0) { + maps[i].mutate(partitioned[i], kvCount); + for (Mutation m : partitioned[i]) + kvCount += m.getUpdates().size(); + } + } + } finally { + // clear immediately so mutations can be garbage collected + for (List list : partitioned) { + list.clear(); + } + } + } + + } + + private static class DefaultMap implements SimpleMap { + private ConcurrentSkipListMap map = new ConcurrentSkipListMap(new MemKeyComparator()); + private AtomicLong bytesInMemory = new AtomicLong(); + private AtomicInteger size = new AtomicInteger(); + + public void put(Key key, Value value) { + // Always a MemKey, so account for the kvCount int + bytesInMemory.addAndGet(key.getLength() + 4); + bytesInMemory.addAndGet(value.getSize()); + if (map.put(key, value) == null) + size.incrementAndGet(); + } + + public Value get(Key key) { + return map.get(key); + } + + public Iterator> iterator(Key startKey) { + Key lk = new Key(startKey); + SortedMap tm = map.tailMap(lk); + return tm.entrySet().iterator(); + } + + public int size() { + return size.get(); + } + + public synchronized InterruptibleIterator skvIterator() { + if (map == null) + throw new IllegalStateException(); + + return new SortedMapIterator(map); + } + + public synchronized void delete() { + map = null; + } + + public long getOverheadPerEntry() { + // all of the java objects that are used to hold the + // data and make it searchable have overhead... this + // overhead is estimated using test.EstimateInMemMapOverhead + // and is in bytes.. the estimates were obtained by running + // java 6_16 in 64 bit server mode + + return 200; + } + + @Override + public void mutate(List mutations, int kvCount) { + for (Mutation m : mutations) { + for (ColumnUpdate cvp : m.getUpdates()) { + Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(), + false, kvCount++); + Value value = new Value(cvp.getValue()); + put(newKey, value); + } + } + } + + @Override + public long getMemoryUsed() { + return bytesInMemory.get() + (size() * getOverheadPerEntry()); + } + } + + private static class NativeMapWrapper implements SimpleMap { + private NativeMap nativeMap; + + NativeMapWrapper() { + nativeMap = new NativeMap(); + } + + public Value get(Key key) { + return nativeMap.get(key); + } + + public Iterator> iterator(Key startKey) { + return nativeMap.iterator(startKey); + } + + public int size() { + return nativeMap.size(); + } + + public InterruptibleIterator skvIterator() { + return (InterruptibleIterator) nativeMap.skvIterator(); + } + + public void delete() { + nativeMap.delete(); + } + + public long getMemoryUsed() { + return nativeMap.getMemoryUsed(); + } + + @Override + public void mutate(List mutations, int kvCount) { + nativeMap.mutate(mutations, kvCount); + } + } + + private AtomicInteger nextKVCount = new AtomicInteger(1); + private AtomicInteger kvCount = new AtomicInteger(0); + + private Object writeSerializer = new Object(); + + /** + * Applies changes to a row in the InMemoryMap + * + */ + public void mutate(List mutations) { + int numKVs = 0; + for (int i = 0; i < mutations.size(); i++) + numKVs += mutations.get(i).size(); + + // Can not update mutationCount while writes that started before + // are in progress, this would cause partial mutations to be seen. + // Also, can not continue until mutation count is updated, because + // a read may not see a successful write. Therefore writes must + // wait for writes that started before to finish. + // + // using separate lock from this map, to allow read/write in parallel + synchronized (writeSerializer ) { + int kv = nextKVCount.getAndAdd(numKVs); + try { + map.mutate(mutations, kv); + } finally { + kvCount.set(kv + numKVs - 1); + } + } + } + + /** + * Returns a long representing the size of the InMemoryMap + * + * @return bytesInMemory + */ + public synchronized long estimatedSizeInBytes() { + if (map == null) + return 0; + + return map.getMemoryUsed(); + } + + Iterator> iterator(Key startKey) { + return map.iterator(startKey); + } + + public long getNumEntries() { + return map.size(); + } + + private final Set activeIters = Collections.synchronizedSet(new HashSet()); + + class MemoryDataSource implements DataSource { + + boolean switched = false; + private InterruptibleIterator iter; + private FileSKVIterator reader; + private MemoryDataSource parent; + private IteratorEnvironment env; ++ private AtomicBoolean iflag; + + MemoryDataSource() { - this(null, false, null); ++ this(null, false, null, null); + } + - public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) { ++ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) { + this.parent = parent; + this.switched = switched; + this.env = env; ++ this.iflag = iflag; + } + + @Override + public boolean isCurrent() { + if (switched) + return true; + else + return memDumpFile == null; + } + + @Override + public DataSource getNewDataSource() { + if (switched) + throw new IllegalStateException(); + + if (!isCurrent()) { + switched = true; + iter = null; + try { + // ensure files are referenced even if iterator was never seeked before + iterator(); + } catch (IOException e) { + throw new RuntimeException(); + } + } + + return this; + } + + private synchronized FileSKVIterator getReader() throws IOException { + if (reader == null) { + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); + + reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration()); ++ if (iflag != null) ++ reader.setInterruptFlag(iflag); + } + + return reader; + } + + @Override + public SortedKeyValueIterator iterator() throws IOException { + if (iter == null) - if (!switched) ++ if (!switched) { + iter = map.skvIterator(); - else { ++ if (iflag != null) ++ iter.setInterruptFlag(iflag); ++ } else { + if (parent == null) + iter = new MemKeyConversionIterator(getReader()); + else + synchronized (parent) { + // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the + // thread deleting an InMemoryMap and scan thread could be switching different deep copies + iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env)); + } + } + + return iter; + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { - return new MemoryDataSource(parent == null ? this : parent, switched, env); ++ return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag); ++ } ++ ++ @Override ++ public void setInterruptFlag(AtomicBoolean flag) { ++ this.iflag = flag; + } + + } + + class MemoryIterator extends WrappingIterator implements InterruptibleIterator { + + private AtomicBoolean closed; + private SourceSwitchingIterator ssi; + private MemoryDataSource mds; + + protected SortedKeyValueIterator getSource() { + if (closed.get()) + throw new IllegalStateException("Memory iterator is closed"); + return super.getSource(); + } + + private MemoryIterator(InterruptibleIterator source) { + this(source, new AtomicBoolean(false)); + } + + private MemoryIterator(SortedKeyValueIterator source, AtomicBoolean closed) { + setSource(source); + this.closed = closed; + } + + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + return new MemoryIterator(getSource().deepCopy(env), closed); + } + + public void close() { + + synchronized (this) { + if (closed.compareAndSet(false, true)) { + try { + if (mds.reader != null) + mds.reader.close(); + } catch (IOException e) { + log.warn(e, e); + } + } + } + + // remove outside of sync to avoid deadlock + activeIters.remove(this); + } + + private synchronized boolean switchNow() throws IOException { + if (closed.get()) + return false; + + ssi.switchNow(); + return true; + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + + private void setSSI(SourceSwitchingIterator ssi) { + this.ssi = ssi; + } + + public void setMDS(MemoryDataSource mds) { + this.mds = mds; + } + + } + + public synchronized MemoryIterator skvIterator() { + if (map == null) + throw new NullPointerException(); + + if (deleted) + throw new IllegalStateException("Can not obtain iterator after map deleted"); + + int mc = kvCount.get(); + MemoryDataSource mds = new MemoryDataSource(); + SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource()); + MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc)); + mi.setSSI(ssi); + mi.setMDS(mds); + activeIters.add(mi); + return mi; + } + + public SortedKeyValueIterator compactionIterator() { + + if (nextKVCount.get() - 1 != kvCount.get()) + throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + + kvCount.get()); + + return map.skvIterator(); + } + + private boolean deleted = false; + + public void delete(long waitTime) { + + synchronized (this) { + if (deleted) + throw new IllegalStateException("Double delete"); + + deleted = true; + } + + long t1 = System.currentTimeMillis(); + + while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) { + UtilWaitThread.sleep(50); + } + + if (activeIters.size() > 0) { + // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file + try { + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); + + String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION; + + Configuration newConf = new Configuration(conf); + newConf.setInt("io.seqfile.compress.blocksize", 100000); + + FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration()); + + InterruptibleIterator iter = map.skvIterator(); + + HashSet allfams= new HashSet(); + + for(Entry> entry : lggroups.entrySet()){ + allfams.addAll(entry.getValue()); + out.startNewLocalityGroup(entry.getKey(), entry.getValue()); + iter.seek(new Range(), entry.getValue(), true); + dumpLocalityGroup(out, iter); + } + + out.startDefaultLocalityGroup(); + iter.seek(new Range(), allfams, false); + + dumpLocalityGroup(out, iter); + + out.close(); + + log.debug("Created mem dump file " + tmpFile); + + memDumpFile = tmpFile; + + synchronized (activeIters) { + for (MemoryIterator mi : activeIters) { + mi.switchNow(); + } + } + + // rely on unix behavior that file will be deleted when last + // reader closes it + fs.delete(new Path(memDumpFile), true); + + } catch (IOException ioe) { + log.error("Failed to create mem dump file ", ioe); + + while (activeIters.size() > 0) { + UtilWaitThread.sleep(100); + } + } + + } + + SimpleMap tmpMap = map; + + synchronized (this) { + map = null; + } + + tmpMap.delete(); + } + + private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException { + while (iter.hasTop() && activeIters.size() > 0) { + // RFile does not support MemKey, so we move the kv count into the value only for the RFile. + // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written + Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount); + out.append(iter.getTopKey(), newValue); + iter.next(); + + } + } +}