hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r500378 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/test/org/apache/hadoop/io/TestSequenceFile.java
Date Fri, 26 Jan 2007 22:11:43 GMT
Author: cutting
Date: Fri Jan 26 14:11:42 2007
New Revision: 500378

URL: http://svn.apache.org/viewvc?view=rev&rev=500378
Log:
HADOOP-732.  Add support to SequenceFile for arbitrary metadata, as a set of attribute value
pairs.  Contributed by Runping.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 14:11:42 2007
@@ -67,6 +67,9 @@
     in HDFS, try another replica of the data, if any.
     (Wendy Chien via cutting)
 
+21. HADOOP-732.  Add support to SequenceFile for arbitrary metadata,
+    as a set of attribute value pairs.  (Runping Qi via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 26 14:11:42
2007
@@ -50,8 +50,9 @@
 
   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
   private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
+  private static final byte VERSION_WITH_METADATA = (byte)6;
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
+    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -93,7 +94,7 @@
                                         CompressionType val) {
     job.set("io.seqfile.compression.type", val.toString());
   }
-  
+    
   /**
    * Construct the preferred type of SequenceFile Writer.
    * @param fs The configured filesystem. 
@@ -130,7 +131,7 @@
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass);
+      writer = new Writer(fs, conf, name, keyClass, valClass, null, new Metadata());
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
           new DefaultCodec());
@@ -161,13 +162,13 @@
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass, progress); 
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress, new Metadata());

     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress);
+          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, new DefaultCodec(), progress);
+          keyClass, valClass, new DefaultCodec(), progress, new Metadata());
     }
     
     return writer;
@@ -222,6 +223,7 @@
    * @param compressionType The compression type.
    * @param codec The compression codec.
    * @param progress The Progressable object to track progress.
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    */
@@ -229,7 +231,7 @@
   createWriter(FileSystem fs, Configuration conf, Path name, 
       Class keyClass, Class valClass, 
       CompressionType compressionType, CompressionCodec codec,
-      Progressable progress) throws IOException {
+      Progressable progress, Metadata metadata) throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
@@ -240,17 +242,40 @@
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress, metadata);
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress);
+          keyClass, valClass, codec, progress, metadata);
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, codec, progress);
+          keyClass, valClass, codec, progress, metadata);
     }
     
     return writer;
   }
+  
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @param progress The Progressable object to track progress.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, 
+      CompressionType compressionType, CompressionCodec codec,
+      Progressable progress) throws IOException {
+    Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
+        compressionType, codec, progress, new Metadata());
+    return writer;
+  }
 
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
@@ -259,13 +284,14 @@
    * @param valClass The 'value' type.
    * @param compress Compress data?
    * @param blockCompress Compress blocks?
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    */
   private static Writer
   createWriter(Configuration conf, FSDataOutputStream out, 
       Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec)
+      CompressionCodec codec, Metadata metadata)
   throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
@@ -277,11 +303,11 @@
     Writer writer = null;
 
     if (!compress) {
-      writer = new Writer(conf, out, keyClass, valClass);
+      writer = new Writer(conf, out, keyClass, valClass, metadata);
     } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     } else {
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     }
     
     return writer;
@@ -289,19 +315,41 @@
 
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compress Compress data?
+   * @param blockCompress Compress blocks?
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  private static Writer
+  createWriter(Configuration conf, FSDataOutputStream out, 
+      Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+      CompressionCodec codec)
+  throws IOException {
+    Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
+        blockCompress, codec, new Metadata());
+    return writer;
+  }
+
+  
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
    * @param conf The configuration.
    * @param out The stream on top which the writer is to be constructed.
    * @param keyClass The 'key' type.
    * @param valClass The 'value' type.
    * @param compressionType The compression type.
    * @param codec The compression codec.
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    */
   public static Writer
   createWriter(Configuration conf, FSDataOutputStream out, 
       Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec)
+      CompressionCodec codec, Metadata metadata)
   throws IOException {
     if ((codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
@@ -313,15 +361,37 @@
     Writer writer = null;
 
     if (compressionType == CompressionType.NONE) {
-      writer = new Writer(conf, out, keyClass, valClass);
+      writer = new Writer(conf, out, keyClass, valClass, metadata);
     } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     } else if (compressionType == CompressionType.BLOCK){
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
     }
     
     return writer;
   }
+  
+  /**
+   * Construct the preferred type of 'raw' SequenceFile Writer.
+   * @param conf The configuration.
+   * @param out The stream on top which the writer is to be constructed.
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(Configuration conf, FSDataOutputStream out, 
+      Class keyClass, Class valClass, CompressionType compressionType,
+      CompressionCodec codec)
+  throws IOException {
+    Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
+        codec, new Metadata());
+    return writer;
+  }
+  
 
   /** The interface to 'raw' values of SequenceFiles. */
   public static interface ValueBytes {
@@ -424,6 +494,99 @@
 
   } // CompressedBytes
   
+  /**
+   * The class encapsulating with the metadata of a file.
+   * The metadata of a file is a list of attribute name/value
+   * pairs of Text type.
+   *
+   */
+  static class Metadata implements Writable {
+
+    private TreeMap<Text, Text> theMetadata;
+    
+    public Metadata() {
+      this(new TreeMap<Text, Text>());
+    }
+    
+    public Metadata(TreeMap<Text, Text> arg) {
+      if (arg == null) {
+        this.theMetadata = new TreeMap<Text, Text>();
+      } else {
+        this.theMetadata = arg;
+      }
+    }
+    
+    public Text get(Text name) {
+      return this.theMetadata.get(name);
+    }
+    
+    public void set(Text name, Text value) {
+      this.theMetadata.put(name, value);
+    }
+    
+    public TreeMap<Text, Text> getMetadata() {
+      return new TreeMap<Text, Text>(this.theMetadata);
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(this.theMetadata.size());
+      Iterator iter = this.theMetadata.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+        en.getKey().write(out);
+        en.getValue().write(out);
+      }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      int sz = in.readInt();
+      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
+      this.theMetadata = new TreeMap<Text, Text>();
+      for (int i = 0; i < sz; i++) {
+        Text key = new Text();
+        Text val = new Text();
+        key.readFields(in);
+        val.readFields(in);
+        this.theMetadata.put(key, val);
+      }    
+    }
+    
+    public boolean equals(Metadata other) {
+      if (other == null) return false;
+      if (this.theMetadata.size() != other.theMetadata.size()) {
+        return false;
+      }
+      Iterator iter1 = this.theMetadata.entrySet().iterator();
+      Iterator iter2 = other.theMetadata.entrySet().iterator();
+      while (iter1.hasNext() && iter2.hasNext()) {
+        Map.Entry<Text, Text> en1 = (Map.Entry<Text, Text>)iter1.next();
+        Map.Entry<Text, Text> en2 = (Map.Entry<Text, Text>)iter2.next();
+        if (!en1.getKey().equals(en2.getKey())) {
+           return false;
+        }
+        if (!en1.getValue().equals(en2.getValue())) {
+           return false;
+        }
+      }
+      if (iter1.hasNext() || iter2.hasNext()) {
+        return false;
+      }
+      return true;
+    }
+    
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("size: ").append(this.theMetadata.size()).append("\n");
+      Iterator iter = this.theMetadata.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
+        sb.append("\n");
+      }
+      return sb.toString();
+    }
+  }
+  
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer {
     Configuration conf;
@@ -438,6 +601,7 @@
     CompressionCodec codec = null;
     CompressionOutputStream deflateFilter = null;
     DataOutputStream deflateOut = null;
+    Metadata metadata = null;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -462,24 +626,24 @@
     public Writer(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass)
       throws IOException {
-      this(fs, conf, name, keyClass, valClass, null);
+      this(fs, conf, name, keyClass, valClass, null, new Metadata());
     }
     
     /** Create the named file with write-progress reporter. */
     public Writer(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, Progressable progress)
+        Class keyClass, Class valClass, Progressable progress, Metadata metadata)
       throws IOException {
-      init(name, conf, fs.create(name, progress), keyClass, valClass, false, null);
+      init(name, conf, fs.create(name, progress), keyClass, valClass, false, null, metadata);
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
     }
-
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     private Writer(Configuration conf, FSDataOutputStream out, 
-        Class keyClass, Class valClass)
+        Class keyClass, Class valClass, Metadata metadata)
     throws IOException {
-      init(null, conf, out, keyClass, valClass, false, null);
+      init(null, conf, out, keyClass, valClass, false, null, metadata);
       
       initializeFileHeader();
       writeFileHeader();
@@ -514,12 +678,13 @@
       if(this.isCompressed()) {
         Text.writeString(out, (codec.getClass()).getName());
       }
+      this.metadata.write(out);
     }
-
+    
     /** Initialize. */
     void init(Path name, Configuration conf, FSDataOutputStream out,
                       Class keyClass, Class valClass,
-                      boolean compress, CompressionCodec codec) 
+                      boolean compress, CompressionCodec codec, Metadata metadata) 
     throws IOException {
       this.target = name;
       this.conf = conf;
@@ -528,6 +693,7 @@
       this.valClass = valClass;
       this.compress = compress;
       this.codec = codec;
+      this.metadata = metadata;
       if(this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
         this.deflateFilter = this.codec.createOutputStream(buffer);
@@ -644,7 +810,7 @@
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
-      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       
       initializeFileHeader();
       writeFileHeader();
@@ -654,28 +820,36 @@
     /** Create the named file with write-progress reporter. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
+        Progressable progress, Metadata metadata)
     throws IOException {
       super.init(name, conf, fs.create(name, progress), 
-          keyClass, valClass, true, codec);
+          keyClass, valClass, true, codec, metadata);
       
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
     }
     
+    /** Create the named file with write-progress reporter. */
+    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
+    throws IOException {
+      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+    }
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec)
+                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
-      super.init(null, conf, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       
     }
-
+    
     boolean isCompressed() { return true; }
     boolean isBlockCompressed() { return false; }
 
@@ -752,7 +926,7 @@
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
-      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -763,10 +937,10 @@
     /** Create the named file with write-progress reporter. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec,
-        Progressable progress)
+        Progressable progress, Metadata metadata)
     throws IOException {
       super.init(name, conf, fs.create(name, progress), keyClass, valClass, 
-          true, codec);
+          true, codec, metadata);
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -774,18 +948,26 @@
       finalizeFileHeader();
     }
     
+    /** Create the named file with write-progress reporter. */
+    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
+    throws IOException {
+      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+    }
+    
     /** Write to an arbitrary stream using a specified buffer size. */
     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
-                   Class keyClass, Class valClass, CompressionCodec codec)
+                   Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
-      super.init(null, conf, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       init(1000000);
       
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
     }
-
+    
     boolean isCompressed() { return true; }
     boolean isBlockCompressed() { return true; }
 
@@ -928,6 +1110,7 @@
     private Class valClass;
 
     private CompressionCodec codec = null;
+    private Metadata metadata = null;
     
     private byte[] sync = new byte[SYNC_HASH_SIZE];
     private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
@@ -1046,6 +1229,11 @@
         }
       }
       
+      this.metadata = new Metadata();
+      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
+        this.metadata.readFields(in);
+      }
+      
       if (version > 1) {                          // if version > 1
         in.readFully(sync);                       // read sync bytes
       }
@@ -1095,6 +1283,11 @@
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
 
+    /** Returns the metadata object of the file */
+    public Metadata getMetadata() {
+      return this.metadata;
+    }
+    
     /** Returns the configuration used for this file. */
     Configuration getConf() { return conf; }
     

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Fri Jan 26 14:11:42
2007
@@ -317,7 +317,91 @@
     return sorter;
   }
 
+  /** Unit tests for SequenceFile metadata. */
+  public void testSequenceFileMetadata() throws Exception {
+    LOG.info("Testing SequenceFile with metadata");
+    int count = 1024 * 10;
+    int megabytes = 1;
+    int factor = 5;
+    CompressionCodec codec = new DefaultCodec();
+    Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata");
+    Path recordCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata");
+    Path blockCompressedFile = 
+      new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata");
+ 
+    FileSystem fs = FileSystem.getLocal(conf);
+    SequenceFile.Metadata theMetadata = new SequenceFile.Metadata();
+    theMetadata.set(new Text("name_1"), new Text("value_1"));
+    theMetadata.set(new Text("name_2"), new Text("value_2"));
+    theMetadata.set(new Text("name_3"), new Text("value_3"));
+    theMetadata.set(new Text("name_4"), new Text("value_4"));
+    
+    int seed = new Random().nextInt();
+    
+    try {
+      // SequenceFile.Writer
+      writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata);
+      SequenceFile.Metadata aMetadata = readMetadata(fs, file);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 1);
+      }
+      // SequenceFile.RecordCompressWriter
+      writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
+          codec, theMetadata);
+      aMetadata = readMetadata(fs, recordCompressedFile);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 2);
+      }
+      // SequenceFile.BlockCompressWriter
+      writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
+          codec, theMetadata);
+      aMetadata =readMetadata(fs, blockCompressedFile);
+      if (!theMetadata.equals(aMetadata)) {
+        LOG.info("The original metadata:\n" + theMetadata.toString());
+        LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+        throw new RuntimeException("metadata not match:  " + 3);
+      }
+    } finally {
+      fs.close();
+    }
+    LOG.info("Successfully tested SequenceFile with metadata");
+  }
+  
+  
+  private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file)
+  throws IOException {
+  LOG.info("reading file: " + file.toString() + "\n");
+  SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
+  SequenceFile.Metadata meta = reader.getMetadata(); 
+  reader.close();
+  return meta;
+  }
 
+  private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
+      CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
+    throws IOException {
+    fs.delete(file);
+    LOG.info("creating " + count + " records with metadata and with" + compressionType +
+              " compression");
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fs, conf, file, 
+          RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      writer.append(key, value);
+    }
+    writer.close();
+  }
+  
   /** For debugging and testing. */
   public static void main(String[] args) throws Exception {
     int count = 1024 * 1024;



Mime
View raw message