accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/3] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Mon, 02 Dec 2013 22:54:15 GMT
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
	server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/890ee25c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/890ee25c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/890ee25c

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 890ee25cf3b9c7d1fd256a3b3237e7c3030e4183
Parents: 360f0cf cd96f85
Author: Josh Elser <elserj@apache.org>
Authored: Mon Dec 2 17:53:54 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Dec 2 17:53:54 2013 -0500

----------------------------------------------------------------------
 .../accumulo/server/fs/VolumeManagerImpl.java      | 17 +++++++++++------
 .../accumulo/tserver/log/MultiReaderTest.java      |  2 --
 2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 39afe75,0000000..472b0c0
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@@ -1,474 -1,0 +1,479 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.fs;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Method;
 +import java.net.URI;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.commons.lang.NotImplementedException;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.ContentSummary;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.LocalFileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.Trash;
 +import org.apache.hadoop.fs.permission.FsPermission;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.hadoop.util.Progressable;
 +import org.apache.log4j.Logger;
 +
 +public class VolumeManagerImpl implements VolumeManager {
 +
 +  private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
 +
 +  Map<String,? extends FileSystem> volumes;
 +  String defaultVolume;
 +  AccumuloConfiguration conf;
 +  VolumeChooser chooser;
 +
 +  protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume,
AccumuloConfiguration conf) {
 +    this.volumes = volumes;
 +    this.defaultVolume = defaultVolume;
 +    this.conf = conf;
 +    ensureSyncIsEnabled();
 +    chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER,
VolumeChooser.class, new RandomVolumeChooser());
 +  }
 +
 +  public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException
{
 +    return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())),
"",
 +        DefaultConfiguration.getDefaultConfiguration());
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    IOException ex = null;
 +    for (FileSystem fs : volumes.values()) {
 +      try {
 +        fs.close();
 +      } catch (IOException e) {
 +        ex = e;
 +      }
 +    }
 +    if (ex != null) {
 +      throw ex;
 +    }
 +  }
 +
 +  @Override
 +  public boolean closePossiblyOpenFile(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    if (fs instanceof DistributedFileSystem) {
 +      DistributedFileSystem dfs = (DistributedFileSystem) fs;
 +      try {
 +        return dfs.recoverLease(path);
 +      } catch (FileNotFoundException ex) {
 +        throw ex;
 +      }
 +    } else if (fs instanceof LocalFileSystem) {
 +      // ignore
 +    } else {
 +      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
 +    }
 +    fs.append(path).close();
 +    log.info("Recovered lease on " + path.toString() + " using append");
 +    return true;
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.create(path);
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.create(path, overwrite);
 +  }
 +
 +  private static long correctBlockSize(Configuration conf, long blockSize) {
 +    if (blockSize <= 0)
 +      blockSize = conf.getLong("dfs.block.size", 67108864);
 +
 +    int checkSum = conf.getInt("io.bytes.per.checksum", 512);
 +    blockSize -= blockSize % checkSum;
 +    blockSize = Math.max(blockSize, checkSum);
 +    return blockSize;
 +  }
 +
 +  private static int correctBufferSize(Configuration conf, int bufferSize) {
 +    if (bufferSize <= 0)
 +      bufferSize = conf.getInt("io.file.buffer.size", 4096);
 +    return bufferSize;
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication,
long blockSize) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    if (bufferSize == 0) {
 +      fs.getConf().getInt("io.file.buffer.size", 4096);
 +    }
 +    return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(),
blockSize));
 +  }
 +
 +  @Override
 +  public boolean createNewFile(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.createNewFile(path);
 +  }
 +
 +  @Override
 +  public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication,
long blockSize) throws IOException {
 +    FileSystem fs = getFileSystemByPath(logPath);
 +    blockSize = correctBlockSize(fs.getConf(), blockSize);
 +    bufferSize = correctBufferSize(fs.getConf(), bufferSize);
 +    try {
 +      // This...
 +      // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
 +      // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication,
blockSize, null);
 +      // Becomes this:
 +      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
 +      List<Enum<?>> flags = new ArrayList<Enum<?>>();
 +      if (createFlags.isEnum()) {
 +        for (Object constant : createFlags.getEnumConstants()) {
 +          if (constant.toString().equals("SYNC_BLOCK")) {
 +            flags.add((Enum<?>) constant);
 +            log.debug("Found synch enum " + constant);
 +          }
 +          if (constant.toString().equals("CREATE")) {
 +            flags.add((Enum<?>) constant);
 +            log.debug("Found CREATE enum " + constant);
 +          }
 +        }
 +      }
 +      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null,
flags.get(0), flags.get(1));
 +      log.debug("CreateFlag set: " + set);
 +      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class,
EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
 +      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
 +      return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(),
set, bufferSize, replication, blockSize, null);
 +    } catch (ClassNotFoundException ex) {
 +      // Expected in hadoop 1.0
 +      return fs.create(logPath, true, bufferSize, replication, blockSize);
 +    } catch (Exception ex) {
 +      log.debug(ex, ex);
 +      return fs.create(logPath, true, bufferSize, replication, blockSize);
 +    }
 +  }
 +
 +  @Override
 +  public boolean delete(Path path) throws IOException {
 +    return getFileSystemByPath(path).delete(path, false);
 +  }
 +
 +  @Override
 +  public boolean deleteRecursively(Path path) throws IOException {
 +    return getFileSystemByPath(path).delete(path, true);
 +  }
 +
 +  protected void ensureSyncIsEnabled() {
 +    for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
 +      final String volumeName = entry.getKey();
 +      final FileSystem fs = entry.getValue();
 +      
 +      if (fs instanceof DistributedFileSystem) {
 +        final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
 +        final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
 +        // Check to make sure that we have proper defaults configured
 +        try {
 +          // If the default is off (0.20.205.x or 1.0.x)
 +          DFSConfigKeys configKeys = new DFSConfigKeys();
 +          
 +          // Can't use the final constant itself as Java will inline it at compile time
 +          Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
 +          boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
 +          
 +          if (!dfsSupportAppendDefaultValue) {
 +            // See if the user did the correct override
 +            if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
 +              String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
 +              log.fatal(msg);
 +              throw new RuntimeException(msg);
 +            }
 +          }
 +        } catch (NoSuchFieldException e) {
 +          // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
 +          // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync
enabled.
 +        } catch (Exception e) {
 +          log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName
+ ". The user should ensure that Hadoop is configured to properly supports append and sync.
" + ticketMessage, e);
 +        }
 +        
 +        // If either of these parameters are configured to be false, fail.
 +        // This is a sign that someone is writing bad configuration.
 +        if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC,
true)) {
 +          String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC
+ " not be configured as false. " + ticketMessage;
 +          log.fatal(msg);
 +          throw new RuntimeException(msg);
 +        }
 +        
 +        try {
-           // if this class exists
-           Class.forName("org.apache.hadoop.fs.CreateFlag");
-           // we're running hadoop 2.0, 1.1
++          // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be
everything >=1.1.1 and the 0.23 line)
++          Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys");
++          dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY");
++        
++          // Everything else
 +          if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
-             log.warn("dfs.datanode.synconclose set to false: data loss is possible on system
reset or power loss on volume " + volumeName);
++            log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss
is possible on system reset or power loss");
 +          }
 +        } catch (ClassNotFoundException ex) {
-           // hadoop 1.0
++          // hadoop 1.0.X or hadoop 1.1.0
++        } catch (SecurityException e) {
++          // hadoop 1.0.X or hadoop 1.1.0
++        } catch (NoSuchFieldException e) {
++          // hadoop 1.0.X or hadoop 1.1.0
 +        }
 +      }
 +    }
- 
 +  }
 +
 +  @Override
 +  public boolean exists(Path path) throws IOException {
 +    return getFileSystemByPath(path).exists(path);
 +  }
 +
 +  @Override
 +  public FileStatus getFileStatus(Path path) throws IOException {
 +    return getFileSystemByPath(path).getFileStatus(path);
 +  }
 +
 +  @Override
 +  public FileSystem getFileSystemByPath(Path path) {
 +    if (path.toString().contains(":")) {
 +      try {
 +        return path.getFileSystem(CachedConfiguration.getInstance());
 +      } catch (IOException ex) {
 +        throw new RuntimeException(ex);
 +      }
 +    }
 +
 +    return volumes.get(defaultVolume);
 +  }
 +
 +  @Override
 +  public Map<String,? extends FileSystem> getFileSystems() {
 +    return volumes;
 +  }
 +
 +  @Override
 +  public FileStatus[] listStatus(Path path) throws IOException {
 +    return getFileSystemByPath(path).listStatus(path);
 +  }
 +
 +  @Override
 +  public boolean mkdirs(Path path) throws IOException {
 +    return getFileSystemByPath(path).mkdirs(path);
 +  }
 +
 +  @Override
 +  public FSDataInputStream open(Path path) throws IOException {
 +    return getFileSystemByPath(path).open(path);
 +  }
 +
 +  @Override
 +  public boolean rename(Path path, Path newPath) throws IOException {
 +    FileSystem source = getFileSystemByPath(path);
 +    FileSystem dest = getFileSystemByPath(newPath);
 +    if (source != dest) {
 +      throw new NotImplementedException("Cannot rename files across volumes: " + path +
" -> " + newPath);
 +    }
 +    return source.rename(path, newPath);
 +  }
 +
 +  @Override
 +  public boolean moveToTrash(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    Trash trash = new Trash(fs, fs.getConf());
 +    return trash.moveToTrash(path);
 +  }
 +
 +  @Override
 +  public short getDefaultReplication(Path path) {
 +    @SuppressWarnings("deprecation")
 +    short rep = getFileSystemByPath(path).getDefaultReplication();
 +    return rep;
 +  }
 +
 +  @Override
 +  public boolean isFile(Path path) throws IOException {
 +    return getFileSystemByPath(path).isFile(path);
 +  }
 +
 +  public static VolumeManager get() throws IOException {
 +    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
 +    return get(conf);
 +  }
 +
 +  static private final String DEFAULT = "";
 +
 +  public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
 +    Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
 +    Configuration hadoopConf = CachedConfiguration.getInstance();
 +    fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
 +    String ns = conf.get(Property.INSTANCE_VOLUMES);
 +    if (ns != null && !ns.isEmpty()) {
 +      for (String space : ns.split(",")) {
 +        if (space.equals(DEFAULT))
 +          throw new IllegalArgumentException();
 +
 +        if (space.contains(":")) {
 +          fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
 +        } else {
 +          fileSystems.put(space, FileSystem.get(hadoopConf));
 +        }
 +      }
 +    }
 +    return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
 +  }
 +
 +  @Override
 +  public boolean isReady() throws IOException {
 +    for (FileSystem fs : getFileSystems().values()) {
 +      if (!(fs instanceof DistributedFileSystem))
 +        continue;
 +      DistributedFileSystem dfs = (DistributedFileSystem) fs;
 +      // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
 +      // Becomes this:
 +      Class<?> safeModeAction;
 +      try {
 +        // hadoop 2.0
 +        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
 +      } catch (ClassNotFoundException ex) {
 +        // hadoop 1.0
 +        try {
 +          safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
 +        } catch (ClassNotFoundException e) {
 +          throw new RuntimeException("Cannot figure out the right class for Constants");
 +        }
 +      }
 +      Object get = null;
 +      for (Object obj : safeModeAction.getEnumConstants()) {
 +        if (obj.toString().equals("SAFEMODE_GET"))
 +          get = obj;
 +      }
 +      if (get == null) {
 +        throw new RuntimeException("cannot find SAFEMODE_GET");
 +      }
 +      try {
 +        Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
 +        boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
 +        if (inSafeMode) {
 +          return false;
 +        }
 +      } catch (Exception ex) {
 +        throw new RuntimeException("cannot find method setSafeMode");
 +      }
 +    }
 +    return true;
 +  }
 +
 +  @Override
 +  public FileSystem getDefaultVolume() {
 +    return volumes.get(defaultVolume);
 +  }
 +
 +  @Override
 +  public FileStatus[] globStatus(Path pathPattern) throws IOException {
 +    return getFileSystemByPath(pathPattern).globStatus(pathPattern);
 +  }
 +
 +  @Override
 +  public Path getFullPath(Key key) {
 +    // TODO sanity check col fam
 +    String relPath = key.getColumnQualifierData().toString();
 +    byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
 +    return getFullPath(new String(tableId), relPath);
 +  }
 +
 +  @Override
 +  public Path matchingFileSystem(Path source, String[] options) {
 +    URI uri1 = source.toUri();
 +    for (String option : options) {
 +      URI uri3 = URI.create(option);
 +      if (uri1.getScheme().equals(uri3.getScheme())) {
 +        String a1 = uri1.getAuthority();
 +        String a2 = uri3.getAuthority();
 +        if (a1 == a2 || (a1 != null && a1.equals(a2)))
 +          return new Path(option);
 +      }
 +    }
 +    return null;
 +  }
 +
 +  @Override
 +  public Path getFullPath(String tableId, String path) {
 +    if (path.contains(":"))
 +      return new Path(path);
 +    
 +    if (path.startsWith("../"))
 +      path = path.substring(2);
 +    else if (path.startsWith("/"))
 +      path = "/" + tableId + path;
 +    else
 +      throw new IllegalArgumentException("Unexpected path prefix " + path);
 +    
 +    return getFullPath(FileType.TABLE, path);
 +  }
 +  
 +  @Override
 +  public Path getFullPath(FileType fileType, String path) {
 +    if (path.contains(":"))
 +      return new Path(path);
 +    
 +    // normalize the path
 +    Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
 +    if (path.startsWith("/"))
 +      path = path.substring(1);
 +    fullPath = new Path(fullPath, path);
 +    
 +    FileSystem fs = getFileSystemByPath(fullPath);
 +    return fs.makeQualified(fullPath);
 +  }
 +
 +  @Override
 +  public ContentSummary getContentSummary(Path dir) throws IOException {
 +    return getFileSystemByPath(dir).getContentSummary(dir);
 +  }
 +
 +  @Override
 +  public String choose(String[] options) {
 +    return chooser.choose(options);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/890ee25c/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
----------------------------------------------------------------------
diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
index bb96295,0000000..168842e
mode 100644,000000..100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
@@@ -1,151 -1,0 +1,149 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.log;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.File;
 +import java.io.IOException;
 +
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.BytesWritable;
 +import org.apache.hadoop.io.IntWritable;
 +import org.apache.hadoop.io.MapFile.Writer;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.junit.rules.TemporaryFolder;
 +
 +public class MultiReaderTest {
 +
 +  VolumeManager fs;
 +  TemporaryFolder root = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
 +
 +  @Before
 +  public void setUp() throws Exception {
-     // quiet log messages about compress.CodecPool
-     Logger.getRootLogger().setLevel(Level.ERROR);
 +    fs = VolumeManagerImpl.getLocal();
 +    root.create();
 +    String path = root.getRoot().getAbsolutePath();
 +    Path root = new Path("file://" + path + "/manyMaps");
 +    fs.mkdirs(root);
 +    fs.create(new Path(root, "finished")).close();
 +    FileSystem ns = fs.getDefaultVolume();
 +    
 +    @SuppressWarnings("deprecation")
 +    Writer oddWriter = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class,
BytesWritable.class);
 +    BytesWritable value = new BytesWritable("someValue".getBytes());
 +    for (int i = 1; i < 1000; i += 2) {
 +      oddWriter.append(new IntWritable(i), value);
 +    }
 +    oddWriter.close();
 +    
 +    @SuppressWarnings("deprecation")
 +    Writer evenWriter = new Writer(ns.getConf(), ns, new Path(root, "even").toString(),
IntWritable.class, BytesWritable.class);
 +    for (int i = 0; i < 1000; i += 2) {
 +      if (i == 10)
 +        continue;
 +      evenWriter.append(new IntWritable(i), value);
 +    }
 +    evenWriter.close();
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    root.create();
 +  }
 +
 +  private void scan(MultiReader reader, int start) throws IOException {
 +    IntWritable key = new IntWritable();
 +    BytesWritable value = new BytesWritable();
 +
 +    for (int i = start + 1; i < 1000; i++) {
 +      if (i == 10)
 +        continue;
 +      assertTrue(reader.next(key, value));
 +      assertEquals(i, key.get());
 +    }
 +  }
 +
 +  private void scanOdd(MultiReader reader, int start) throws IOException {
 +    IntWritable key = new IntWritable();
 +    BytesWritable value = new BytesWritable();
 +
 +    for (int i = start + 2; i < 1000; i += 2) {
 +      assertTrue(reader.next(key, value));
 +      assertEquals(i, key.get());
 +    }
 +  }
 +
 +  @Test
 +  public void testMultiReader() throws IOException {
 +    Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps");
 +    MultiReader reader = new MultiReader(fs, manyMaps);
 +    IntWritable key = new IntWritable();
 +    BytesWritable value = new BytesWritable();
 +
 +    for (int i = 0; i < 1000; i++) {
 +      if (i == 10)
 +        continue;
 +      assertTrue(reader.next(key, value));
 +      assertEquals(i, key.get());
 +    }
 +    assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0);
 +    assertFalse(reader.next(key, value));
 +
 +    key.set(500);
 +    assertTrue(reader.seek(key));
 +    scan(reader, 500);
 +    key.set(10);
 +    assertFalse(reader.seek(key));
 +    scan(reader, 10);
 +    key.set(1000);
 +    assertFalse(reader.seek(key));
 +    assertFalse(reader.next(key, value));
 +    key.set(-1);
 +    assertFalse(reader.seek(key));
 +    key.set(0);
 +    assertTrue(reader.next(key, value));
 +    assertEquals(0, key.get());
 +    reader.close();
 +
 +    fs.deleteRecursively(new Path(manyMaps, "even"));
 +    reader = new MultiReader(fs, manyMaps);
 +    key.set(501);
 +    assertTrue(reader.seek(key));
 +    scanOdd(reader, 501);
 +    key.set(1000);
 +    assertFalse(reader.seek(key));
 +    assertFalse(reader.next(key, value));
 +    key.set(-1);
 +    assertFalse(reader.seek(key));
 +    key.set(1);
 +    assertTrue(reader.next(key, value));
 +    assertEquals(1, key.get());
 +    reader.close();
 +
 +  }
 +
 +}


Mime
View raw message