flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1225129 - in /incubator/flume/branches/flume-897/wal/wal-avro/src: main/java/org/apache/wal/avro/ test/java/org/apache/wal/avro/
Date Wed, 28 Dec 2011 07:30:35 GMT
Author: esammer
Date: Wed Dec 28 07:30:34 2011
New Revision: 1225129

URL: http://svn.apache.org/viewvc?rev=1225129&view=rev
Log:
- Updated WALIndex to support a read and write index.
- Rewrote AvroWALReader to use WALIndex, support multiple WAL files as produced by AvroWALWriter
  and to deal with various corner cases. Works.
- Updated tests for AvroWAL{Reader,Writer}.

Modified:
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
Wed Dec 28 07:30:34 2011
@@ -1,57 +1,201 @@
 package org.apache.wal.avro;
 
+import java.io.EOFException;
+import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 
 import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.wal.WALEntry;
+import org.apache.wal.WALException;
 import org.apache.wal.WALReader;
-import org.apache.wal.avro.AvroWALEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.io.Closeables;
 
 public class AvroWALReader implements WALReader {
 
   private static final Logger logger = LoggerFactory
       .getLogger(AvroWALReader.class);
 
+  private File directory;
+
   private FileInputStream walInputStream;
-  private MappedByteBuffer indexBuffer;
+  private FileChannel inputChannel;
   private Decoder decoder;
-  private SpecificDatumReader<AvroWALEntry> avroReader;
+  private SpecificDatumReader<AvroWALEntry> reader;
+
+  private WALIndex index;
 
+  private File currentFile;
   private long currentPosition;
-  private FileChannel outputChannel;
+
+  public AvroWALReader() {
+    reader = new SpecificDatumReader<AvroWALEntry>(AvroWALEntry.class);
+  }
 
   public void open() {
-    outputChannel = walInputStream.getChannel();
+    logger.info("Opening write ahead log reader for directory:{}", directory);
+
+    if (index.getReadFile() != null) {
+      openWALFile(new File(index.getReadFile()), index.getReadPosition());
+    } else {
+      File file = findNextFile();
+
+      if (file != null) {
+        openWALFile(file, 0);
+      }
+    }
+
+    logger.debug("Opened write ahead log reader:{}", this);
+  }
+
+  private boolean ensureWALFile() {
+    if (decoder == null) {
+      File file = findNextFile();
+
+      if (file != null) {
+        openWALFile(file, 0);
+      }
+    }
+
+    return decoder != null;
+  }
+
+  private void openWALFile(File file, long position) {
+    currentFile = file;
 
     try {
-      outputChannel.position(indexBuffer.getLong(0));
+      walInputStream = new FileInputStream(currentFile);
+      inputChannel = walInputStream.getChannel();
+      decoder = DecoderFactory.get().directBinaryDecoder(walInputStream, null);
+
+      inputChannel.position(position);
+    } catch (FileNotFoundException e) {
+      throw new WALException("Unable to open write ahead log file:" + file
+          + " position:" + position, e);
     } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      throw new WALException("Unable to open write ahead log file:" + file
+          + " position:" + position, e);
+    }
+
+    index.updateReadIndex(currentFile.getPath(), position);
+  }
+
+  private void closeWALFile() {
+    Closeables.closeQuietly(walInputStream);
+    Closeables.closeQuietly(inputChannel);
+
+    decoder = null;
+  }
+
+  private File findNextFile() {
+    logger.debug("Attempting to find a WAL file to read");
+
+    File file = null;
+
+    String[] walFiles = directory.list(new FilenameFilter() {
+
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.endsWith(".wal");
+      }
+    });
+
+    if (walFiles != null && walFiles.length > 0) {
+      List<String> walFilesList = Arrays.asList(walFiles);
+
+      /*
+       * Sort the WAL files by timestamp and select the first in an attempt to
+       * preserve ordering.
+       */
+      Collections.sort(walFilesList, new Comparator<String>() {
+
+        @Override
+        public int compare(String o1, String o2) {
+          Long o1L = Long.parseLong(o1.substring(0, o1.indexOf(".wal")));
+          Long o2L = Long.parseLong(o2.substring(0, o2.indexOf(".wal")));
+
+          return o1L.compareTo(o2L);
+        }
+
+      });
+
+      logger.debug("Sorted list of WAL files:{}", walFilesList);
+
+      if (index.getReadFile() != null) {
+        String readFileName = new File(index.getReadFile()).getName();
+        long currentFileTs = Long.parseLong(readFileName.substring(0,
+            readFileName.indexOf(".wal")));
+
+        logger.debug("Current WAL file timestamp:{}", currentFileTs);
+
+        for (String walFile : walFilesList) {
+          long ts = Long
+              .parseLong(walFile.substring(0, walFile.indexOf(".wal")));
+
+          if (ts > currentFileTs) {
+            file = new File(walFile);
+            logger.debug("WAL file ts:{} > currentFileTs:{} - selected!", ts,
+                currentFileTs);
+            break;
+          }
+        }
+
+      } else {
+        file = new File(directory, walFilesList.get(0));
+      }
     }
+
+    logger.debug("Selected write ahead log {} for reading", file);
+
+    return file;
   }
 
   @Override
   public WALEntry next() {
     WALEntry entry = null;
 
-    try {
-      entry = new AvroWALEntryAdapter(avroReader.read(null, decoder));
-      currentPosition = walInputStream.getChannel().position();
-
-      if (logger.isDebugEnabled()) {
-        logger.debug("Wrote entry:{} markPosition:{} currentPosition:{}",
-            new Object[] { entry, indexBuffer.getLong(0), currentPosition });
+    if (ensureWALFile()) {
+      while (entry == null) {
+        try {
+          entry = new AvroWALEntryAdapter(reader.read(null, decoder));
+          currentPosition = inputChannel.position();
+
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug("Read entry:{} markPosition:{} currentPosition:{}",
+                    new Object[] { entry, index.getReadPosition(),
+                        currentPosition });
+          }
+        } catch (EOFException e) {
+          /*
+           * Hitting EOF means we finished the WAL. There may be another WAL
+           * waiting for us (i.e. the writer is generating new data) so we
+           * attempt to locate a new file and open it. If we find a new file, we
+           * retry the read, otherwise, we break out (i.e. no new data).
+           */
+          closeWALFile();
+
+          if (!ensureWALFile()) {
+            break;
+          }
+        } catch (IOException e) {
+          throw new WALException("Failed to read WAL entry - file:"
+              + currentFile + " position:" + currentPosition, e);
+        }
       }
-    } catch (IOException e) {
-      logger.error("Failed to read WAL entry. Exception follows.", e);
     }
 
     return entry;
@@ -61,19 +205,21 @@ public class AvroWALReader implements WA
   public void mark() {
     logger.debug("Updating currentPosition to:{}", currentPosition);
 
-    indexBuffer.putLong(0, currentPosition);
-    indexBuffer.force();
+    index.updateReadIndex(currentFile.getPath(), currentPosition);
   }
 
   @Override
   public void reset() {
     logger.debug("Rewinding last successful read position");
 
-    try {
-      walInputStream.getChannel().position(indexBuffer.getLong(0));
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+    /*
+     * It's possible we need to reset to a different file. Figure out if this is
+     * a simple pointer move or a file close / open op.
+     */
+    if (currentFile.getPath().equals(index.getReadFile())) {
+      currentPosition = index.getReadPosition();
+    } else {
+      openWALFile(new File(index.getReadFile()), index.getReadPosition());
     }
   }
 
@@ -82,39 +228,30 @@ public class AvroWALReader implements WA
   }
 
   public long getMarkPosition() {
-    return indexBuffer.getLong(0);
-  }
-
-  public FileInputStream getWalInputStream() {
-    return walInputStream;
+    return index.getReadPosition();
   }
 
-  public void setWalInputStream(FileInputStream walInputStream) {
-    this.walInputStream = walInputStream;
+  public WALIndex getIndex() {
+    return index;
   }
 
-  public MappedByteBuffer getIndexBuffer() {
-    return indexBuffer;
+  public void setIndex(WALIndex index) {
+    this.index = index;
   }
 
-  public void setIndexBuffer(MappedByteBuffer indexBuffer) {
-    this.indexBuffer = indexBuffer;
+  public File getDirectory() {
+    return directory;
   }
 
-  public Decoder getDecoder() {
-    return decoder;
+  public void setDirectory(File directory) {
+    this.directory = directory;
   }
 
-  public void setDecoder(Decoder decoder) {
-    this.decoder = decoder;
-  }
-
-  public SpecificDatumReader<AvroWALEntry> getAvroReader() {
-    return avroReader;
-  }
-
-  public void setAvroReader(SpecificDatumReader<AvroWALEntry> avroReader) {
-    this.avroReader = avroReader;
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(getClass()).add("currentFile", currentFile)
+        .add("currentPosition", currentPosition).add("directory", directory)
+        .add("index", index).toString();
   }
 
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
Wed Dec 28 07:30:34 2011
@@ -50,30 +50,11 @@ public class AvroWALWriter implements WA
   public void open() {
     logger.info("Opening write ahead log in:{}", directory);
 
-    openIndex();
     openWALFile();
 
     writer = new SpecificDatumWriter<AvroWALEntry>(AvroWALEntry.class);
   }
 
-  private void openIndex() {
-    logger.info("Opening write ahead log index in directory:{}", directory);
-
-    index = new WALIndex();
-
-    index.setDirectory(directory);
-
-    try {
-      index.open();
-    } catch (FileNotFoundException e) {
-      throw new WALException("Failed to open WAL index. Exception follows.", e);
-    } catch (IOException e) {
-      throw new WALException("Failed to open WAL index. Exception follows.", e);
-    }
-
-    logger.debug("Opened write ahead log index:{}", index);
-  }
-
   private void openWALFile() {
     logger.info("Opening WAL file");
 
@@ -85,7 +66,7 @@ public class AvroWALWriter implements WA
       currentPosition = outputChannel.position();
       encoder = EncoderFactory.get().directBinaryEncoder(walOutputStream, null);
 
-      index.updateIndex(currentFile.getPath(), 0);
+      index.updateWriteIndex(currentFile.getPath(), 0);
     } catch (FileNotFoundException e) {
       throw new WALException(
           "Failed to open WAL (missing parent directory?). Exception follows.",
@@ -117,7 +98,7 @@ public class AvroWALWriter implements WA
 
       if (logger.isDebugEnabled()) {
         logger.debug("Wrote entry:{} markPosition:{} currentPosition:{}",
-            new Object[] { entry, index.getPosition(), currentPosition });
+            new Object[] { entry, index.getWritePosition(), currentPosition });
       }
     } catch (IOException e) {
       throw new WALException("Failed to write WAL entry. Exception follows.", e);
@@ -128,7 +109,7 @@ public class AvroWALWriter implements WA
   public void close() {
     logger.info("Closing write ahead log at:{}", currentFile);
 
-    Closeables.closeQuietly(outputChannel);
+    closeWALFile();
   }
 
   @Override
@@ -136,7 +117,7 @@ public class AvroWALWriter implements WA
     logger.debug("Marking currentFile:{} currentPosition:{}", currentFile,
         currentPosition);
 
-    index.updateIndex(currentFile.getPath(), currentPosition);
+    index.updateWriteIndex(currentFile.getPath(), currentPosition);
 
     eventCount += eventBatchCount;
     eventBatchCount = 0;
@@ -162,10 +143,10 @@ public class AvroWALWriter implements WA
   @Override
   public void reset() {
     logger.debug("Resetting WAL position from:{} to:{}", currentPosition,
-        index.getPosition());
+        index.getWritePosition());
 
     try {
-      outputChannel.truncate(index.getPosition());
+      outputChannel.truncate(index.getWritePosition());
       /*
        * Changes to the file size affect the metadata so we need to force that
        * out as well and pay the price of the second IO.
@@ -195,7 +176,15 @@ public class AvroWALWriter implements WA
   }
 
   public long getMarkPosition() {
-    return index.getPosition();
+    return index.getWritePosition();
+  }
+
+  public WALIndex getIndex() {
+    return index;
+  }
+
+  public void setIndex(WALIndex index) {
+    this.index = index;
   }
 
   @Override

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
Wed Dec 28 07:30:34 2011
@@ -14,47 +14,80 @@ import com.google.common.io.Files;
 
 public class WALIndex {
 
-  private static final String indexFileName = "wal-idx";
+  private static final String writeIndexFileName = "write.idx";
+  private static final String readIndexFileName = "read.idx";
+
   private static final Logger logger = LoggerFactory.getLogger(WALIndex.class);
 
   private File directory;
 
-  private String file;
-  private long position;
+  private File writeIndexFile;
+  private MappedByteBuffer writeIndexBuffer;
+  private File readIndexFile;
+  private MappedByteBuffer readIndexBuffer;
+
+  private String writeFile;
+  private long writePosition;
+  private String readFile;
+  private long readPosition;
 
-  private File indexFile;
-  private MappedByteBuffer indexBuffer;
+  public WALIndex() {
+  }
 
   public synchronized void open() throws FileNotFoundException, IOException {
-    indexFile = new File(directory, indexFileName);
+    writeIndexFile = new File(directory, writeIndexFileName);
+    readIndexFile = new File(directory, readIndexFileName);
 
-    logger.info("Opening WAL index table file:{}", indexFile);
+    logger.info("Opening WAL index table writeFile:{}", writeIndexFile);
+
+    writeIndexBuffer = Files.map(writeIndexFile,
+        FileChannel.MapMode.READ_WRITE, 4 * 1024);
+    readIndexBuffer = Files.map(readIndexFile, FileChannel.MapMode.READ_WRITE,
+        4 * 1024);
+
+    writePosition = writeIndexBuffer.getLong();
+    int writeFileNameLength = writeIndexBuffer.getInt();
+
+    if (writeFileNameLength > 0) {
+      byte[] buffer = new byte[writeFileNameLength];
+      writeIndexBuffer.get(buffer);
+      writeFile = new String(buffer);
+    }
 
-    indexBuffer = Files.map(indexFile, FileChannel.MapMode.READ_WRITE,
-        16 * 1024);
+    writeIndexBuffer.position(0);
 
-    position = indexBuffer.getLong();
-    int fileNameLength = indexBuffer.getInt();
+    readPosition = readIndexBuffer.getLong();
+    int readFileNameLength = readIndexBuffer.getInt();
 
-    if (fileNameLength > 0) {
-      byte[] buffer = new byte[fileNameLength];
-      indexBuffer.get(buffer);
-      file = new String(buffer);
+    if (readFileNameLength > 0) {
+      byte[] buffer = new byte[readFileNameLength];
+      readIndexBuffer.get(buffer);
+      readFile = new String(buffer);
     }
 
-    logger.debug("Loaded position:{} fileNameLength:{} file:{}", new Object[] {
-        position, fileNameLength, file });
+    readIndexBuffer.position(0);
+
+    logger.debug("Loaded index:{}", this);
+  }
+
+  public synchronized void updateWriteIndex(String file, long position) {
+    writeIndexBuffer.putLong(position).putInt(file.length())
+        .put(file.getBytes());
+    writeIndexBuffer.force();
+    writeIndexBuffer.position(0);
 
-    indexBuffer.position(0);
+    this.writeFile = file;
+    this.writePosition = position;
   }
 
-  public synchronized void updateIndex(String file, long position) {
-    indexBuffer.putLong(position).putInt(file.length()).put(file.getBytes());
-    indexBuffer.force();
-    indexBuffer.position(0);
+  public synchronized void updateReadIndex(String file, long position) {
+    readIndexBuffer.putLong(position).putInt(file.length())
+        .put(file.getBytes());
+    readIndexBuffer.force();
+    readIndexBuffer.position(0);
 
-    this.file = file;
-    this.position = position;
+    this.readFile = file;
+    this.readPosition = position;
   }
 
   public synchronized File getDirectory() {
@@ -65,30 +98,44 @@ public class WALIndex {
     this.directory = directory;
   }
 
-  public synchronized File getIndexFile() {
-    return indexFile;
+  public String getWriteFile() {
+    return writeFile;
+  }
+
+  public void setWriteFile(String file) {
+    this.writeFile = file;
+  }
+
+  public long getWritePosition() {
+    return writePosition;
+  }
+
+  public void setWritePosition(long position) {
+    this.writePosition = position;
   }
 
-  public String getFile() {
-    return file;
+  public String getReadFile() {
+    return readFile;
   }
 
-  public void setFile(String file) {
-    this.file = file;
+  public void setReadFile(String readFile) {
+    this.readFile = readFile;
   }
 
-  public long getPosition() {
-    return position;
+  public long getReadPosition() {
+    return readPosition;
   }
 
-  public void setPosition(long position) {
-    this.position = position;
+  public void setReadPosition(long readPosition) {
+    this.readPosition = readPosition;
   }
 
   @Override
   public String toString() {
-    return Objects.toStringHelper(getClass()).add("file", file)
-        .add("position", position).add("directory", directory)
-        .add("indexFile", indexFile).toString();
+    return Objects.toStringHelper(getClass()).add("writeFile", writeFile)
+        .add("writePosition", writePosition).add("readFile", readFile)
+        .add("readPosition", readPosition).add("directory", directory)
+        .add("writeIndexFile", writeIndexFile)
+        .add("readIndexFile", readIndexFile).toString();
   }
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
Wed Dec 28 07:30:34 2011
@@ -1,17 +1,9 @@
 package org.apache.wal.avro;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.wal.WALEntry;
-import org.apache.wal.avro.AvroWALEntry;
-import org.apache.wal.avro.AvroWALEntryAdapter;
-import org.apache.wal.avro.AvroWALReader;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,21 +28,18 @@ public class TestAvroWALReader {
 
     testDirectory.mkdirs();
 
+    Files.copy(new File(Resources.getResource("test-wal").getPath()), new File(
+        testDirectory, "1.wal"));
+
     reader = new AvroWALReader();
 
-    FileInputStream walInputStream = new FileInputStream(new File(Resources
-        .getResource("test-wal").getFile()));
+    WALIndex index = new WALIndex();
+
+    index.setDirectory(testDirectory);
+    index.open();
 
-    MappedByteBuffer indexBuffer = Files.map(new File(testDirectory, "index"),
-        FileChannel.MapMode.READ_WRITE, 8);
-    indexBuffer.putLong(0, 0);
-
-    reader.setDecoder(DecoderFactory.get().directBinaryDecoder(walInputStream,
-        null));
-    reader.setAvroReader(new SpecificDatumReader<AvroWALEntry>(
-        AvroWALEntry.SCHEMA$));
-    reader.setWalInputStream(walInputStream);
-    reader.setIndexBuffer(indexBuffer);
+    reader.setDirectory(testDirectory);
+    reader.setIndex(index);
   }
 
   @SuppressWarnings("deprecation")
@@ -61,6 +50,8 @@ public class TestAvroWALReader {
 
   @Test
   public void testNext() {
+    reader.open();
+
     for (int i = 0; i < 184; i++) {
       WALEntry entry = reader.next();
 
@@ -72,6 +63,12 @@ public class TestAvroWALReader {
 
       reader.mark();
     }
+
+    WALEntry e1 = reader.next();
+    Assert.assertNull(e1);
+
+    e1 = reader.next();
+    Assert.assertNull(e1);
   }
 
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
Wed Dec 28 07:30:34 2011
@@ -32,6 +32,13 @@ public class TestAvroWALWriter {
 
     writer = new AvroWALWriter();
     writer.setDirectory(testDirectory);
+
+    WALIndex index = new WALIndex();
+
+    index.setDirectory(testDirectory);
+    index.open();
+
+    writer.setIndex(index);
   }
 
   @SuppressWarnings("deprecation")

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java?rev=1225129&r1=1225128&r2=1225129&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
(original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
Wed Dec 28 07:30:34 2011
@@ -35,41 +35,71 @@ public class TestWALIndex {
   }
 
   @Test
-  public void testUpdate() throws FileNotFoundException, IOException {
+  public void testUpdateWrite() throws FileNotFoundException, IOException {
     index.open();
-    Assert.assertNull(index.getFile());
-    Assert.assertEquals(0, index.getPosition());
+    Assert.assertNull(index.getWriteFile());
+    Assert.assertEquals(0, index.getWritePosition());
 
-    index.updateIndex("foo", 0);
-    Assert.assertEquals("foo", index.getFile());
-    Assert.assertEquals(0, index.getPosition());
-
-    index.updateIndex("foo", 1);
-    Assert.assertEquals("foo", index.getFile());
-    Assert.assertEquals(1, index.getPosition());
-
-    index.updateIndex("foo", 2);
-    Assert.assertEquals("foo", index.getFile());
-    Assert.assertEquals(2, index.getPosition());
-
-    index.updateIndex("bar", 0);
-    Assert.assertEquals("bar", index.getFile());
-    Assert.assertEquals(0, index.getPosition());
+    index.updateWriteIndex("foo", 0);
+    Assert.assertEquals("foo", index.getWriteFile());
+    Assert.assertEquals(0, index.getWritePosition());
+
+    index.updateWriteIndex("foo", 1);
+    Assert.assertEquals("foo", index.getWriteFile());
+    Assert.assertEquals(1, index.getWritePosition());
+
+    index.updateWriteIndex("foo", 2);
+    Assert.assertEquals("foo", index.getWriteFile());
+    Assert.assertEquals(2, index.getWritePosition());
+
+    index.updateWriteIndex("bar", 0);
+    Assert.assertEquals("bar", index.getWriteFile());
+    Assert.assertEquals(0, index.getWritePosition());
   }
 
   @Test
-  public void testExistingIndex() throws FileNotFoundException, IOException {
+  public void testUpdateRead() throws FileNotFoundException, IOException {
     index.open();
-    Assert.assertNull(index.getFile());
-    Assert.assertEquals(0, index.getPosition());
+    Assert.assertNull(index.getReadFile());
+    Assert.assertEquals(0, index.getReadPosition());
+
+    index.updateReadIndex("foo", 0);
+    Assert.assertEquals("foo", index.getReadFile());
+    Assert.assertEquals(0, index.getReadPosition());
+
+    index.updateReadIndex("foo", 1);
+    Assert.assertEquals("foo", index.getReadFile());
+    Assert.assertEquals(1, index.getReadPosition());
+
+    index.updateReadIndex("foo", 2);
+    Assert.assertEquals("foo", index.getReadFile());
+    Assert.assertEquals(2, index.getReadPosition());
+
+    index.updateReadIndex("bar", 0);
+    Assert.assertEquals("bar", index.getReadFile());
+    Assert.assertEquals(0, index.getReadPosition());
+  }
 
-    index.updateIndex("test", 128);
-    Assert.assertEquals("test", index.getFile());
-    Assert.assertEquals(128, index.getPosition());
+  @Test
+  public void testExistingIndex() throws FileNotFoundException, IOException {
+    index.open();
+    Assert.assertNull(index.getWriteFile());
+    Assert.assertEquals(0, index.getWritePosition());
+    Assert.assertNull(index.getReadFile());
+    Assert.assertEquals(0, index.getReadPosition());
+
+    index.updateWriteIndex("test1", 1);
+    index.updateReadIndex("test2", 2);
+    Assert.assertEquals("test1", index.getWriteFile());
+    Assert.assertEquals(1, index.getWritePosition());
+    Assert.assertEquals("test2", index.getReadFile());
+    Assert.assertEquals(2, index.getReadPosition());
 
     index.open();
-    Assert.assertEquals("test", index.getFile());
-    Assert.assertEquals(128, index.getPosition());
+    Assert.assertEquals("test1", index.getWriteFile());
+    Assert.assertEquals(1, index.getWritePosition());
+    Assert.assertEquals("test2", index.getReadFile());
+    Assert.assertEquals(2, index.getReadPosition());
   }
 
 }



Mime
View raw message