accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1222766 [1/3] - in /incubator/accumulo/trunk: src/core/src/main/java/org/apache/accumulo/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ src/core/src/main/java/org/apache/accumulo/core/file/ src/core/src/main/java...
Date Fri, 23 Dec 2011 17:53:13 GMT
Author: vines
Date: Fri Dec 23 17:53:12 2011
New Revision: 1222766

URL: http://svn.apache.org/viewvc?rev=1222766&view=rev
Log:
Accumulo-227 - switch in memory counts to column counts from mutation counts. It is an int,
not a long, so there is a possibility something could break in the distant future. But this
may get changed with a larger change to the functioning of the system.

Accumulo-149 - first phase, MyMapFile.Writer and MySequenceFile.Writer are purged.

Added:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemValue.java
  (with props)
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomRFile.java
  (contents, props changed)
      - copied, changed from r1215244, incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java
Removed:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/CreateRandomMapFile.java
Modified:
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
    incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/MemKey.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/MidPointPerfTest2.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkFileTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/BulkSplitOptimizationTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/FunctionalTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/MaxOpenTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/util/MapFilePerformanceTest.java
    incubator/accumulo/trunk/test/system/auto/simple/compaction.py

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/Constants.java
Fri Dec 23 17:53:12 2011
@@ -155,6 +155,7 @@ public class Constants {
   public static final String CORE_PACKAGE_NAME = "org.apache.accumulo.core";
   public static final String OLD_PACKAGE_NAME = "cloudbase";
   public static final String VALID_TABLE_NAME_REGEX = "^\\w+$";
+  public static final String MAPFILE_EXTENSION = "map";
   
   public static String getBaseDir(AccumuloConfiguration conf) {
     return conf.get(Property.INSTANCE_DFS_DIR);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ClientService.java
Fri Dec 23 17:53:12 2011
@@ -6262,6 +6262,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -7819,6 +7821,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -14033,6 +14037,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -15162,6 +15168,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -21123,6 +21131,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
Fri Dec 23 17:53:12 2011
@@ -21,13 +21,13 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.map.MapFileOperations;
-import org.apache.accumulo.core.file.map.MyMapFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +41,7 @@ class DispatchingFileFactory extends Fil
     Path p = new Path(file);
     String name = p.getName();
     
-    if (name.startsWith(MyMapFile.EXTENSION + "_")) {
+    if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) {
       return new MapFileOperations();
     }
     
@@ -53,7 +53,7 @@ class DispatchingFileFactory extends Fil
     
     String extension = sp[1];
     
-    if (extension.equals(MyMapFile.EXTENSION) || extension.equals(MyMapFile.EXTENSION + "_tmp"))
{
+    if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION
+ "_tmp")) {
       return new MapFileOperations();
     } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp"))
{
       return new RFileOperations();
@@ -140,7 +140,7 @@ class DispatchingFileFactory extends Fil
 
 public abstract class FileOperations {
   
-  private static final HashSet<String> validExtensions = new HashSet<String>(Arrays.asList(MyMapFile.EXTENSION,
RFile.EXTENSION));
+  private static final HashSet<String> validExtensions = new HashSet<String>(Arrays.asList(Constants.MAPFILE_EXTENSION,
RFile.EXTENSION));
   
   public static Set<String> getValidExtensions() {
     return validExtensions;

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
Fri Dec 23 17:53:12 2011
@@ -25,10 +25,10 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -37,8 +37,8 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.util.CachedConfiguration;
@@ -46,7 +46,6 @@ import org.apache.accumulo.core.util.Loc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -120,13 +119,13 @@ public class FileUtil {
       
       start = end;
       
-      String newMapFile = String.format("%s/" + MyMapFile.EXTENSION + "_%04d", newDir, count++);
+      String newMapFile = String.format("%s/" + RFile.EXTENSION + "_%04d", newDir, count++);
       fs.mkdirs(new Path(newMapFile));
       
-      Path outFile = new Path(String.format("%s/index", newMapFile));
+      String outFile = String.format("%s/index", newMapFile);
       outFiles.add(newMapFile);
       
-      MySequenceFile.Writer writer = MySequenceFile.createWriter(fs, conf, outFile, Key.class,
LongWritable.class, MySequenceFile.CompressionType.BLOCK);
+      FileSKVWriter writer = new RFileOperations().openWriter(outFile, fs, conf, acuConf);
       List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
       
       FileSKVIterator reader = null;
@@ -145,7 +144,7 @@ public class FileUtil {
           boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
           
           if (gtPrevEndRow && lteEndRow)
-            writer.append(key, new LongWritable(0));
+            writer.append(key, new Value(new byte[0]));
           
           if (!lteEndRow)
             break;
@@ -181,7 +180,7 @@ public class FileUtil {
     
     return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir,
pass + 1);
   }
-  
+
   public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration
acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
       double minSplit) throws IOException {
     return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
Fri Dec 23 17:53:12 2011
@@ -17,7 +17,6 @@
 package org.apache.accumulo.core.file.map;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -146,44 +145,9 @@ public class MapFileOperations extends F
   
   @Override
   public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf,
AccumuloConfiguration acuconf) throws IOException {
-    final MyMapFile.Writer mfw = MapFileUtil.openMapFileWriter(acuconf, conf, fs, file);
-    return new FileSKVWriter() {
-      
-      boolean secondCall = false;
-      
-      @Override
-      public void append(Key key, Value value) throws IOException {
-        mfw.append(new Key(key), value);
-      }
-      
-      @Override
-      public void close() throws IOException {
-        mfw.close();
-      }
-      
-      @Override
-      public DataOutputStream createMetaStore(String name) throws IOException {
-        return fs.create(new Path(file, name), false);
-      }
-      
-      @Override
-      public void startDefaultLocalityGroup() throws IOException {
-        if (secondCall)
-          throw new IllegalStateException("Start default locality group called twice");
-        
-        secondCall = true;
-      }
-      
-      @Override
-      public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
throws IOException {
-        throw new UnsupportedOperationException();
-      }
-      
-      @Override
-      public boolean supportsLocalityGroups() {
-        return false;
-      }
-    };
+    
+    throw new UnsupportedOperationException();
+
   }
   
   @Override

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
Fri Dec 23 17:53:12 2011
@@ -19,47 +19,17 @@ package org.apache.accumulo.core.file.ma
 import java.io.IOException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
 
 public class MapFileUtil {
-  private static final Logger log = Logger.getLogger(MapFileUtil.class);
-  
-  public static boolean attemptToFixMapFile(Configuration conf, FileSystem fs, String dirName)
{
-    boolean fixed = true;
-    try {
-      log.info("Attempting to fix mapfile " + dirName);
-      Path indexFile = new Path(dirName + "/" + MyMapFile.INDEX_FILE_NAME);
-      if (fs.exists(indexFile) && fs.getFileStatus(indexFile).getLen() == 0) {
-        log.info("Deleting 0 length index file " + indexFile);
-        fs.delete(indexFile, false);
-      }
-      
-      MyMapFile.fix(fs, new Path(dirName), Key.class, Value.class, false, conf);
-    } catch (Exception e) {
-      log.error("Failed to fix mapfile " + dirName, e);
-      fixed = false;
-    }
-    
-    return fixed;
-  }
-  
   public static MyMapFile.Reader openMapFile(AccumuloConfiguration acuconf, FileSystem fs,
String dirName, Configuration conf) throws IOException {
     MyMapFile.Reader mfr = null;
     try {
       mfr = new MyMapFile.Reader(fs, dirName, conf);
       return mfr;
     } catch (IOException e) {
-      if (attemptToFixMapFile(conf, fs, dirName)) {
-        log.info("Fixed mapfile " + dirName);
-        mfr = new MyMapFile.Reader(fs, dirName, conf);
-        return mfr;
-      }
       throw e;
     }
   }
@@ -71,46 +41,7 @@ public class MapFileUtil {
       index = new MySequenceFile.Reader(fs, indexPath, conf);
       return index;
     } catch (IOException e) {
-      if (attemptToFixMapFile(conf, fs, mapFile.toString())) {
-        log.info("Fixed mapfile " + mapFile);
-        index = new MySequenceFile.Reader(fs, indexPath, conf);
-        return index;
-      }
       throw e;
     }
   }
-  
-  public static MyMapFile.Writer openMapFileWriter(AccumuloConfiguration acuTableConf, Configuration
conf, FileSystem fs, String dirname) throws IOException {
-    MyMapFile.Writer mfw = null;
-    int hbs = conf.getInt("io.seqfile.compress.blocksize", -1);
-    int hrep = conf.getInt("dfs.replication", -1);
-    
-    // dfs.replication
-    
-    Configuration newConf = null;
-    
-    int tbs = (int) acuTableConf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
-    int trep = acuTableConf.getCount(Property.TABLE_FILE_REPLICATION);
-    
-    if (hbs != tbs) {
-      newConf = new Configuration(conf);
-      newConf.setInt("io.seqfile.compress.blocksize", tbs);
-    }
-    
-    if (fs.exists(new Path(dirname)))
-      log.error("Map file " + dirname + " already exists", new Exception());
-    
-    if (newConf != null)
-      conf = newConf;
-    
-    mfw = new MyMapFile.Writer(conf, fs, dirname, Key.class, Value.class, MySequenceFile.CompressionType.BLOCK);
-    
-    if (trep > 0 && trep != hrep) {
-      // tried to set dfs.replication property on conf obj, however this was ignored, so
have to manually set the prop
-      fs.setReplication(new Path(dirname + "/" + MyMapFile.DATA_FILE_NAME), (short) trep);
-      fs.setReplication(new Path(dirname + "/" + MyMapFile.INDEX_FILE_NAME), (short) trep);
-    }
-    
-    return mfw;
-  }
 }

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MyMapFile.java
Fri Dec 23 17:53:12 2011
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -45,17 +44,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableName;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -75,8 +68,6 @@ import org.apache.log4j.Logger;
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class MyMapFile {
   
-  public static final String EXTENSION = "map";
-  
   private static final Logger log = Logger.getLogger(MyMapFile.class);
   
   /** The name of the index file. */
@@ -95,161 +86,6 @@ public class MyMapFile {
   
   protected MyMapFile() {} // no public ctor
   
-  /** Writes a new map. */
-  public static class Writer {
-    private MySequenceFile.Writer data;
-    private MySequenceFile.Writer index;
-    
-    final private static String INDEX_INTERVAL = "io.map.index.interval";
-    private int indexInterval = 128;
-    
-    private long size;
-    private LongWritable position = new LongWritable();
-    
-    // the following fields are used only for checking key order
-    private WritableComparator comparator;
-    private DataInputBuffer inBuf = new DataInputBuffer();
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
-    private WritableComparable lastKey;
-    
-    /** Create the named map for keys of the named class. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class
valClass) throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, MySequenceFile.getCompressionType(conf));
-    }
-    
-    /** Create the named map for keys of the named class. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class
valClass, CompressionType compress, Progressable progress)
-        throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress, progress);
-    }
-    
-    /** Create the named map for keys of the named class. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class
valClass, CompressionType compress, CompressionCodec codec,
-        Progressable progress) throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress, codec,
progress);
-    }
-    
-    /** Create the named map for keys of the named class. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class
valClass, CompressionType compressionType) throws IOException {
-      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compressionType);
-    }
-    
-    /** Create the named map using the named key comparator. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator,
Class valClass) throws IOException {
-      this(conf, fs, dirName, comparator, valClass, MySequenceFile.getCompressionType(conf));
-    }
-    
-    /** Create the named map using the named key comparator. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator,
Class valClass, MySequenceFile.CompressionType compress)
-        throws IOException {
-      this(conf, fs, dirName, comparator, valClass, compress, null);
-    }
-    
-    /** Create the named map using the named key comparator. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator,
Class valClass, MySequenceFile.CompressionType compress,
-        Progressable progress) throws IOException {
-      this(conf, fs, dirName, comparator, valClass, compress, new DefaultCodec(), progress);
-    }
-    
-    /** Create the named map using the named key comparator. */
-    public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator,
Class valClass, MySequenceFile.CompressionType compress,
-        CompressionCodec codec, Progressable progress) throws IOException {
-      
-      // LOG.debug("Opening map file "+dirName+" for write");
-      
-      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
-      
-      this.comparator = comparator;
-      this.lastKey = comparator.newKey();
-      
-      Path dir = new Path(dirName);
-      if (!fs.mkdirs(dir)) {
-        throw new IOException("Mkdirs failed to create directory " + dir.toString());
-      }
-      Path dataFile = new Path(dir, DATA_FILE_NAME);
-      Path indexFile = new Path(dir, INDEX_FILE_NAME);
-      
-      Class keyClass = comparator.getKeyClass();
-      this.data = MySequenceFile.createWriter(fs, conf, dataFile, keyClass, valClass, compress,
codec, progress);
-      this.index = MySequenceFile.createWriter(fs, conf, indexFile, keyClass, LongWritable.class,
CompressionType.BLOCK, progress);
-    }
-    
-    /** The number of entries that are added before an index entry is added. */
-    public int getIndexInterval() {
-      return indexInterval;
-    }
-    
-    /**
-     * Sets the index interval.
-     * 
-     * @see #getIndexInterval()
-     */
-    public void setIndexInterval(int interval) {
-      indexInterval = interval;
-    }
-    
-    /**
-     * Sets the index interval and stores it in conf
-     * 
-     * @see #getIndexInterval()
-     */
-    public static void setIndexInterval(Configuration conf, int interval) {
-      conf.setInt(INDEX_INTERVAL, interval);
-    }
-    
-    /** Close the map. */
-    public synchronized void close() throws IOException {
-      
-      // LOG.debug("Closing map file "+myDir+" for write");
-      
-      data.close();
-      index.close();
-    }
-    
-    /**
-     * Append a key/value pair to the map. The key must be greater or equal to the previous
key added to the map.
-     */
-    public synchronized void append(WritableComparable key, Writable val) throws IOException
{
-      
-      checkKey(key);
-      
-      /*******************************************************************
-       * Instead of storing index values for every 128th key that all point back to the same
compressed block, we can store one key for each compressed block.
-       */
-      if (data.isBlockCompressed()) {
-        // add an index entry when the data size changes, indicating a
-        // new compressed block is added
-        // also add an index entry for the first value
-        if (size == 0 || position.get() != data.getLength()) {
-          position.set(data.getLength());
-          index.append(key, position);
-        }
-      } else {
-        if (size % indexInterval == 0) { // add an index entry
-          position.set(data.getLength()); // point to current eof
-          index.append(key, position);
-        }
-      }
-      
-      data.append(key, val); // append key/value to data
-      size++;
-    }
-    
-    private void checkKey(WritableComparable key) throws IOException {
-      // check that keys are well-ordered
-      if (size != 0 && comparator.compare(lastKey, key) > 0)
-        throw new IOException("key out of order: " + key + " after " + lastKey);
-      
-      // update lastKey with a copy of key by writing and reading
-      outBuf.reset();
-      key.write(outBuf); // write new key
-      
-      inBuf.reset(outBuf.getData(), outBuf.getLength());
-      lastKey.readFields(inBuf); // read into lastKey
-    }
-    
-  }
-  
   /** Provide access to an existing map. */
   public static class Reader implements FileSKVIterator {
     
@@ -916,132 +752,4 @@ public class MyMapFile {
     return indexInterval;
   }
   
-  /**
-   * This method attempts to fix a corrupt MapFile by re-creating its index.
-   * 
-   * Code copied from hadoop (0.18.0) because it was broken there. This is a fixed version.
Do not want to loose changes if MyMapFile is updated.
-   * 
-   * @param fs
-   *          filesystem
-   * @param dir
-   *          directory containing the MapFile data and index
-   * @param keyClass
-   *          key class (has to be a subclass of Writable)
-   * @param valueClass
-   *          value class (has to be a subclass of Writable)
-   * @param dryrun
-   *          do not perform any changes, just report what needs to be done
-   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
-   */
-  public static long fix(FileSystem fs, Path dir, Class<? extends WritableComparable>
keyClass, Class<? extends Writable> valueClass, boolean dryrun,
-      Configuration conf) throws Exception {
-    String dr = (dryrun ? "[DRY RUN ] " : "");
-    Path data = new Path(dir, DATA_FILE_NAME);
-    Path index = new Path(dir, INDEX_FILE_NAME);
-    int indexInterval = getIndexInterval();
-    if (!fs.exists(data)) {
-      // there's nothing we can do to fix this!
-      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
-    }
-    if (fs.exists(index)) {
-      // no fixing needed
-      return -1;
-    }
-    MySequenceFile.Reader dataReader = null;
-    MySequenceFile.Writer indexWriter = null;
-    long cnt = 0L;
-    try {
-      dataReader = new MySequenceFile.Reader(fs, data, conf);
-      if (!dataReader.getKeyClass().equals(keyClass)) {
-        throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName()
+ ", got " + dataReader.getKeyClass().getName());
-      }
-      if (!dataReader.getValueClass().equals(valueClass)) {
-        throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName()
+ ", got " + dataReader.getValueClass().getName());
-      }
-      Writable key = ReflectionUtils.newInstance(keyClass, conf);
-      Writable value = ReflectionUtils.newInstance(valueClass, conf);
-      if (!dryrun)
-        indexWriter = MySequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
-      long currentPos = 0L;
-      long lastPos = 0L;
-      
-      LongWritable position = new LongWritable();
-      lastPos = dataReader.getPosition();
-      
-      boolean blockCompressed = dataReader.isBlockCompressed();
-      
-      if (!blockCompressed) {
-        currentPos = lastPos;
-      }
-      
-      while (dataReader.next(key, value)) {
-        if (blockCompressed) {
-          if (cnt == 0) {
-            currentPos = dataReader.getPosition();
-          } else {
-            long pos = dataReader.getPosition();
-            if (pos != currentPos) {
-              lastPos = currentPos;
-              currentPos = pos;
-            }
-          }
-          // write an index entry at position 0 and whenever the position changes
-          if (cnt == 0 || position.get() != lastPos) {
-            position.set(lastPos);
-            if (!dryrun)
-              indexWriter.append(key, position);
-          }
-        } else {
-          if (cnt % indexInterval == 0) {
-            position.set(currentPos);
-            if (!dryrun)
-              indexWriter.append(key, position);
-          }
-          long pos = dataReader.getPosition();
-          if (pos != currentPos) {
-            lastPos = currentPos;
-            currentPos = pos;
-          }
-        }
-        cnt++;
-        
-      }
-    } catch (Throwable t) {
-      // truncated data file. swallow it.
-      log.error("Exception when trying to fix map file " + dir, t);
-    } finally {
-      if (dataReader != null)
-        dataReader.close();
-      if (indexWriter != null)
-        indexWriter.close();
-    }
-    return cnt;
-  }
-  
-  public static void main(String[] args) throws Exception {
-    String usage = "Usage: MapFile inFile outFile";
-    
-    if (args.length != 2) {
-      System.err.println(usage);
-      System.exit(1);
-    }
-    
-    String in = args[0];
-    String out = args[1];
-    
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = FileSystem.getLocal(conf);
-    MyMapFile.Reader reader = new MyMapFile.Reader(fs, in, conf);
-    MyMapFile.Writer writer = new MyMapFile.Writer(conf, fs, out, reader.getKeyClass(), reader.getValueClass());
-    
-    WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getKeyClass(),
conf);
-    Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-    
-    while (reader.next(key, value))
-      // copy all entries
-      writer.append(key, value);
-    
-    writer.close();
-  }
-  
 }



Mime
View raw message