Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2F03510893 for ; Mon, 2 Dec 2013 22:54:14 +0000 (UTC) Received: (qmail 16639 invoked by uid 500); 2 Dec 2013 22:54:14 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 16611 invoked by uid 500); 2 Dec 2013 22:54:14 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 16484 invoked by uid 99); 2 Dec 2013 22:54:14 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Dec 2013 22:54:14 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C656B91AAD1; Mon, 2 Dec 2013 22:54:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Mon, 02 Dec 2013 22:54:15 -0000 Message-Id: <0280ff8334454083a346716c2a15b9e3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/890ee25c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/890ee25c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/890ee25c Branch: refs/heads/1.6.0-SNAPSHOT Commit: 890ee25cf3b9c7d1fd256a3b3237e7c3030e4183 Parents: 360f0cf cd96f85 Author: Josh Elser Authored: Mon Dec 2 17:53:54 2013 -0500 Committer: Josh Elser Committed: Mon Dec 2 17:53:54 2013 -0500 ---------------------------------------------------------------------- .../accumulo/server/fs/VolumeManagerImpl.java | 17 +++++++++++------ .../accumulo/tserver/log/MultiReaderTest.java | 2 -- 2 files changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 39afe75,0000000..472b0c0 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@@ -1,474 -1,0 +1,479 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +public class VolumeManagerImpl implements VolumeManager { + + private static final Logger log = Logger.getLogger(VolumeManagerImpl.class); + + Map volumes; + String defaultVolume; + AccumuloConfiguration conf; + VolumeChooser chooser; + + protected VolumeManagerImpl(Map volumes, String defaultVolume, AccumuloConfiguration conf) { + this.volumes = volumes; + this.defaultVolume = defaultVolume; + this.conf = conf; + ensureSyncIsEnabled(); + chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser()); + } + + public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException { + return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "", + DefaultConfiguration.getDefaultConfiguration()); + } + + @Override + public void close() throws IOException { + IOException ex = null; + for (FileSystem fs : volumes.values()) { + try { + fs.close(); + } catch (IOException e) { + ex = e; + } + } + if (ex != null) { + throw ex; + } + } + + @Override + public boolean closePossiblyOpenFile(Path path) throws IOException { + FileSystem fs = getFileSystemByPath(path); + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + return dfs.recoverLease(path); + } catch (FileNotFoundException ex) { + throw ex; + } + } else if (fs instanceof LocalFileSystem) { + // ignore + } else { + throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName()); + } + fs.append(path).close(); + log.info("Recovered lease on " + path.toString() + " using append"); + return true; + } + + @Override + public FSDataOutputStream create(Path path) throws IOException { + FileSystem fs = getFileSystemByPath(path); + return fs.create(path); + } + + @Override + public FSDataOutputStream create(Path path, boolean overwrite) throws IOException { + FileSystem fs = getFileSystemByPath(path); + return fs.create(path, overwrite); + } + + private static long correctBlockSize(Configuration conf, long blockSize) { + if (blockSize <= 0) + blockSize = conf.getLong("dfs.block.size", 67108864); + + int checkSum = conf.getInt("io.bytes.per.checksum", 512); + blockSize -= blockSize % checkSum; + blockSize = Math.max(blockSize, checkSum); + return blockSize; + } + + private static int correctBufferSize(Configuration conf, int bufferSize) { + if (bufferSize <= 0) + bufferSize = conf.getInt("io.file.buffer.size", 4096); + return bufferSize; + } + + @Override + public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException { + FileSystem fs = getFileSystemByPath(path); + if (bufferSize == 0) { + fs.getConf().getInt("io.file.buffer.size", 4096); + } + return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize)); + } + + @Override + public boolean createNewFile(Path path) throws IOException { + FileSystem fs = getFileSystemByPath(path); + return fs.createNewFile(path); + } + + @Override + public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException { + FileSystem fs = getFileSystemByPath(logPath); + blockSize = correctBlockSize(fs.getConf(), blockSize); + bufferSize = correctBufferSize(fs.getConf(), bufferSize); + try { + // This... + // EnumSet set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE); + // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null); + // Becomes this: + Class createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag"); + List> flags = new ArrayList>(); + if (createFlags.isEnum()) { + for (Object constant : createFlags.getEnumConstants()) { + if (constant.toString().equals("SYNC_BLOCK")) { + flags.add((Enum) constant); + log.debug("Found synch enum " + constant); + } + if (constant.toString().equals("CREATE")) { + flags.add((Enum) constant); + log.debug("Found CREATE enum " + constant); + } + } + } + Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1)); + log.debug("CreateFlag set: " + set); + Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class); + log.debug("creating " + logPath + " with SYNCH_BLOCK flag"); + return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null); + } catch (ClassNotFoundException ex) { + // Expected in hadoop 1.0 + return fs.create(logPath, true, bufferSize, replication, blockSize); + } catch (Exception ex) { + log.debug(ex, ex); + return fs.create(logPath, true, bufferSize, replication, blockSize); + } + } + + @Override + public boolean delete(Path path) throws IOException { + return getFileSystemByPath(path).delete(path, false); + } + + @Override + public boolean deleteRecursively(Path path) throws IOException { + return getFileSystemByPath(path).delete(path, true); + } + + protected void ensureSyncIsEnabled() { + for (Entry entry : getFileSystems().entrySet()) { + final String volumeName = entry.getKey(); + final FileSystem fs = entry.getValue(); + + if (fs instanceof DistributedFileSystem) { + final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append"; + final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details."; + // Check to make sure that we have proper defaults configured + try { + // If the default is off (0.20.205.x or 1.0.x) + DFSConfigKeys configKeys = new DFSConfigKeys(); + + // Can't use the final constant itself as Java will inline it at compile time + Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT"); + boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys); + + if (!dfsSupportAppendDefaultValue) { + // See if the user did the correct override + if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) { + String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage; + log.fatal(msg); + throw new RuntimeException(msg); + } + } + } catch (NoSuchFieldException e) { + // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running + // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled. + } catch (Exception e) { + log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e); + } + + // If either of these parameters are configured to be false, fail. + // This is a sign that someone is writing bad configuration. + if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) { + String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage; + log.fatal(msg); + throw new RuntimeException(msg); + } + + try { - // if this class exists - Class.forName("org.apache.hadoop.fs.CreateFlag"); - // we're running hadoop 2.0, 1.1 ++ // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line) ++ Class dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys"); ++ dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY"); ++ ++ // Everything else + if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) { - log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss on volume " + volumeName); ++ log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on system reset or power loss"); + } + } catch (ClassNotFoundException ex) { - // hadoop 1.0 ++ // hadoop 1.0.X or hadoop 1.1.0 ++ } catch (SecurityException e) { ++ // hadoop 1.0.X or hadoop 1.1.0 ++ } catch (NoSuchFieldException e) { ++ // hadoop 1.0.X or hadoop 1.1.0 + } + } + } - + } + + @Override + public boolean exists(Path path) throws IOException { + return getFileSystemByPath(path).exists(path); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + return getFileSystemByPath(path).getFileStatus(path); + } + + @Override + public FileSystem getFileSystemByPath(Path path) { + if (path.toString().contains(":")) { + try { + return path.getFileSystem(CachedConfiguration.getInstance()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + return volumes.get(defaultVolume); + } + + @Override + public Map getFileSystems() { + return volumes; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + return getFileSystemByPath(path).listStatus(path); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return getFileSystemByPath(path).mkdirs(path); + } + + @Override + public FSDataInputStream open(Path path) throws IOException { + return getFileSystemByPath(path).open(path); + } + + @Override + public boolean rename(Path path, Path newPath) throws IOException { + FileSystem source = getFileSystemByPath(path); + FileSystem dest = getFileSystemByPath(newPath); + if (source != dest) { + throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath); + } + return source.rename(path, newPath); + } + + @Override + public boolean moveToTrash(Path path) throws IOException { + FileSystem fs = getFileSystemByPath(path); + Trash trash = new Trash(fs, fs.getConf()); + return trash.moveToTrash(path); + } + + @Override + public short getDefaultReplication(Path path) { + @SuppressWarnings("deprecation") + short rep = getFileSystemByPath(path).getDefaultReplication(); + return rep; + } + + @Override + public boolean isFile(Path path) throws IOException { + return getFileSystemByPath(path).isFile(path); + } + + public static VolumeManager get() throws IOException { + AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance()); + return get(conf); + } + + static private final String DEFAULT = ""; + + public static VolumeManager get(AccumuloConfiguration conf) throws IOException { + Map fileSystems = new HashMap(); + Configuration hadoopConf = CachedConfiguration.getInstance(); + fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf)); + String ns = conf.get(Property.INSTANCE_VOLUMES); + if (ns != null && !ns.isEmpty()) { + for (String space : ns.split(",")) { + if (space.equals(DEFAULT)) + throw new IllegalArgumentException(); + + if (space.contains(":")) { + fileSystems.put(space, new Path(space).getFileSystem(hadoopConf)); + } else { + fileSystems.put(space, FileSystem.get(hadoopConf)); + } + } + } + return new VolumeManagerImpl(fileSystems, DEFAULT, conf); + } + + @Override + public boolean isReady() throws IOException { + for (FileSystem fs : getFileSystems().values()) { + if (!(fs instanceof DistributedFileSystem)) + continue; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET)) + // Becomes this: + Class safeModeAction; + try { + // hadoop 2.0 + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"); + } catch (ClassNotFoundException ex) { + // hadoop 1.0 + try { + safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot figure out the right class for Constants"); + } + } + Object get = null; + for (Object obj : safeModeAction.getEnumConstants()) { + if (obj.toString().equals("SAFEMODE_GET")) + get = obj; + } + if (get == null) { + throw new RuntimeException("cannot find SAFEMODE_GET"); + } + try { + Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction); + boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get); + if (inSafeMode) { + return false; + } + } catch (Exception ex) { + throw new RuntimeException("cannot find method setSafeMode"); + } + } + return true; + } + + @Override + public FileSystem getDefaultVolume() { + return volumes.get(defaultVolume); + } + + @Override + public FileStatus[] globStatus(Path pathPattern) throws IOException { + return getFileSystemByPath(pathPattern).globStatus(pathPattern); + } + + @Override + public Path getFullPath(Key key) { + // TODO sanity check col fam + String relPath = key.getColumnQualifierData().toString(); + byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow()); + return getFullPath(new String(tableId), relPath); + } + + @Override + public Path matchingFileSystem(Path source, String[] options) { + URI uri1 = source.toUri(); + for (String option : options) { + URI uri3 = URI.create(option); + if (uri1.getScheme().equals(uri3.getScheme())) { + String a1 = uri1.getAuthority(); + String a2 = uri3.getAuthority(); + if (a1 == a2 || (a1 != null && a1.equals(a2))) + return new Path(option); + } + } + return null; + } + + @Override + public Path getFullPath(String tableId, String path) { + if (path.contains(":")) + return new Path(path); + + if (path.startsWith("../")) + path = path.substring(2); + else if (path.startsWith("/")) + path = "/" + tableId + path; + else + throw new IllegalArgumentException("Unexpected path prefix " + path); + + return getFullPath(FileType.TABLE, path); + } + + @Override + public Path getFullPath(FileType fileType, String path) { + if (path.contains(":")) + return new Path(path); + + // normalize the path + Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory()); + if (path.startsWith("/")) + path = path.substring(1); + fullPath = new Path(fullPath, path); + + FileSystem fs = getFileSystemByPath(fullPath); + return fs.makeQualified(fullPath); + } + + @Override + public ContentSummary getContentSummary(Path dir) throws IOException { + return getFileSystemByPath(dir).getContentSummary(dir); + } + + @Override + public String choose(String[] options) { + return chooser.choose(options); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java ---------------------------------------------------------------------- diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java index bb96295,0000000..168842e mode 100644,000000..100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java @@@ -1,151 -1,0 +1,149 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; + +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.MapFile.Writer; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class MultiReaderTest { + + VolumeManager fs; + TemporaryFolder root = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + @Before + public void setUp() throws Exception { - // quiet log messages about compress.CodecPool - Logger.getRootLogger().setLevel(Level.ERROR); + fs = VolumeManagerImpl.getLocal(); + root.create(); + String path = root.getRoot().getAbsolutePath(); + Path root = new Path("file://" + path + "/manyMaps"); + fs.mkdirs(root); + fs.create(new Path(root, "finished")).close(); + FileSystem ns = fs.getDefaultVolume(); + + @SuppressWarnings("deprecation") + Writer oddWriter = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class); + BytesWritable value = new BytesWritable("someValue".getBytes()); + for (int i = 1; i < 1000; i += 2) { + oddWriter.append(new IntWritable(i), value); + } + oddWriter.close(); + + @SuppressWarnings("deprecation") + Writer evenWriter = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class, BytesWritable.class); + for (int i = 0; i < 1000; i += 2) { + if (i == 10) + continue; + evenWriter.append(new IntWritable(i), value); + } + evenWriter.close(); + } + + @After + public void tearDown() throws Exception { + root.create(); + } + + private void scan(MultiReader reader, int start) throws IOException { + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = start + 1; i < 1000; i++) { + if (i == 10) + continue; + assertTrue(reader.next(key, value)); + assertEquals(i, key.get()); + } + } + + private void scanOdd(MultiReader reader, int start) throws IOException { + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = start + 2; i < 1000; i += 2) { + assertTrue(reader.next(key, value)); + assertEquals(i, key.get()); + } + } + + @Test + public void testMultiReader() throws IOException { + Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps"); + MultiReader reader = new MultiReader(fs, manyMaps); + IntWritable key = new IntWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = 0; i < 1000; i++) { + if (i == 10) + continue; + assertTrue(reader.next(key, value)); + assertEquals(i, key.get()); + } + assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0); + assertFalse(reader.next(key, value)); + + key.set(500); + assertTrue(reader.seek(key)); + scan(reader, 500); + key.set(10); + assertFalse(reader.seek(key)); + scan(reader, 10); + key.set(1000); + assertFalse(reader.seek(key)); + assertFalse(reader.next(key, value)); + key.set(-1); + assertFalse(reader.seek(key)); + key.set(0); + assertTrue(reader.next(key, value)); + assertEquals(0, key.get()); + reader.close(); + + fs.deleteRecursively(new Path(manyMaps, "even")); + reader = new MultiReader(fs, manyMaps); + key.set(501); + assertTrue(reader.seek(key)); + scanOdd(reader, 501); + key.set(1000); + assertFalse(reader.seek(key)); + assertFalse(reader.next(key, value)); + key.set(-1); + assertFalse(reader.seek(key)); + key.set(1); + assertTrue(reader.next(key, value)); + assertEquals(1, key.get()); + reader.close(); + + } + +}