flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1220620 - in /incubator/flume/branches/flume-897: ./ wal/ wal/wal-avro/ wal/wal-avro/src/ wal/wal-avro/src/main/ wal/wal-avro/src/main/avro/ wal/wal-avro/src/main/java/ wal/wal-avro/src/main/java/org/ wal/wal-avro/src/main/java/org/apache/...
Date Mon, 19 Dec 2011 06:37:00 GMT
Author: esammer
Date: Mon Dec 19 06:36:58 2011
New Revision: 1220620

URL: http://svn.apache.org/viewvc?rev=1220620&view=rev
Log:
- Initial import of wal library.
- Updated the parent pom to include wal module.
- Bumped avro to version 1.6.1.

Added:
    incubator/flume/branches/flume-897/wal/
    incubator/flume/branches/flume-897/wal/pom.xml
    incubator/flume/branches/flume-897/wal/wal-avro/
    incubator/flume/branches/flume-897/wal/wal-avro/pom.xml
    incubator/flume/branches/flume-897/wal/wal-avro/src/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALEntryAdapter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/log4j.properties
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/test-wal
    incubator/flume/branches/flume-897/wal/wal-core/
    incubator/flume/branches/flume-897/wal/wal-core/pom.xml
    incubator/flume/branches/flume-897/wal/wal-core/src/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WAL.java
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALEntry.java
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALWriter.java
    incubator/flume/branches/flume-897/wal/wal-core/src/test/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
    incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/
    incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/log4j.properties
Modified:
    incubator/flume/branches/flume-897/pom.xml

Modified: incubator/flume/branches/flume-897/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/pom.xml?rev=1220620&r1=1220619&r2=1220620&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/pom.xml (original)
+++ incubator/flume/branches/flume-897/pom.xml Mon Dec 19 06:36:58 2011
@@ -51,6 +51,7 @@ limitations under the License.
         <module>flume-ng-node</module>
         <module>flume-ng-dist</module>
         <module>flume-ng-channels</module>
+        <module>wal</module>
       </modules>
     </profile>
 
@@ -320,6 +321,14 @@ limitations under the License.
       <plugins>
 
         <plugin>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro-maven-plugin</artifactId>
+          <version>1.6.1</version>
+          <configuration>
+          </configuration>
+        </plugin>
+
+        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>2.3.2</version>
@@ -536,19 +545,19 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
-        <version>1.5.1</version>
+        <version>1.6.1</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-compiler</artifactId>
-        <version>1.5.1</version>
+        <version>1.6.1</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-ipc</artifactId>
-        <version>1.5.1</version>
+        <version>1.6.1</version>
       </dependency>
 
       <!-- FIXME: This should be removed when we migrate the IRC source / 
@@ -613,11 +622,11 @@ limitations under the License.
         <version>1.4</version>
       </dependency>
 
-     <dependency>
-       <groupId>org.apache.derby</groupId>
-       <artifactId>derby</artifactId>
-       <version>10.8.1.2</version>
-     </dependency>
+      <dependency>
+        <groupId>org.apache.derby</groupId>
+        <artifactId>derby</artifactId>
+        <version>10.8.1.2</version>
+      </dependency>
 
     </dependencies>
   </dependencyManagement>

Added: incubator/flume/branches/flume-897/wal/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/pom.xml?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/pom.xml (added)
+++ incubator/flume/branches/flume-897/wal/pom.xml Mon Dec 19 06:36:58 2011
@@ -0,0 +1,23 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.flume</groupId>
+    <artifactId>flume-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.wal</groupId>
+  <artifactId>wal</artifactId>
+  <packaging>pom</packaging>
+  <name>Write Ahead Log Library</name>
+  <description>A general purpose WAL framework and implementation</description>
+
+  <modules>
+    <module>wal-core</module>
+    <module>wal-avro</module>
+  </modules>
+
+</project>

Added: incubator/flume/branches/flume-897/wal/wal-avro/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/pom.xml?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/pom.xml (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/pom.xml Mon Dec 19 06:36:58 2011
@@ -0,0 +1,76 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>wal</artifactId>
+    <groupId>org.apache.wal</groupId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>wal-avro</artifactId>
+  <name>Avro file based implementation of a WAL</name>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id></id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.wal</groupId>
+      <artifactId>wal-core</artifactId>
+      <version>1.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-compiler</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl Mon Dec 19 06:36:58
2011
@@ -0,0 +1,9 @@
+@namespace("org.apache.wal.avro")
+
+protocol AvroWALProtocol {
+
+  record AvroWALEntry {
+    long timeStamp;
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,65 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+
+import org.apache.wal.WAL;
+import org.apache.wal.WALReader;
+import org.apache.wal.WALWriter;
+
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+
+public class AvroWAL implements WAL {
+
+  private File directory;
+
+  private RandomAccessFile walRAF;
+
+  private MappedByteBuffer indexBuffer;
+
+  @Override
+  public void configure(Map<String, String> properties) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void open() {
+    try {
+      indexBuffer = (MappedByteBuffer) Files.map(new File(directory, "index"),
+          FileChannel.MapMode.READ_WRITE, 8);
+
+      walRAF = new RandomAccessFile(new File(directory, "data.wal"), "rwd");
+    } catch (FileNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void close() {
+    Closeables.closeQuietly(walRAF);
+  }
+
+  @Override
+  public WALReader getReader() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public WALWriter getWriter() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALEntryAdapter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALEntryAdapter.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALEntryAdapter.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALEntryAdapter.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,22 @@
+package org.apache.wal.avro;
+
+import org.apache.wal.WALEntry;
+
+public class AvroWALEntryAdapter implements WALEntry {
+
+  private AvroWALEntry entry;
+
+  public AvroWALEntryAdapter(AvroWALEntry entry) {
+    this.entry = entry;
+  }
+
+  @Override
+  public String toString() {
+    return "AvroWALEntryAdapter { entry:" + entry + " }";
+  }
+
+  public AvroWALEntry getEntry() {
+    return entry;
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,120 @@
+package org.apache.wal.avro;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.wal.WALEntry;
+import org.apache.wal.WALReader;
+import org.apache.wal.avro.AvroWALEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroWALReader implements WALReader {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(AvroWALReader.class);
+
+  private FileInputStream walInputStream;
+  private MappedByteBuffer indexBuffer;
+  private Decoder decoder;
+  private SpecificDatumReader<AvroWALEntry> avroReader;
+
+  private long currentPosition;
+  private FileChannel outputChannel;
+
+  public void open() {
+    outputChannel = walInputStream.getChannel();
+
+    try {
+      outputChannel.position(indexBuffer.getLong(0));
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public WALEntry next() {
+    WALEntry entry = null;
+
+    try {
+      entry = new AvroWALEntryAdapter(avroReader.read(null, decoder));
+      currentPosition = walInputStream.getChannel().position();
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Wrote entry:{} markPosition:{} currentPosition:{}",
+            new Object[] { entry, indexBuffer.getLong(0), currentPosition });
+      }
+    } catch (IOException e) {
+      logger.error("Failed to read WAL entry. Exception follows.", e);
+    }
+
+    return entry;
+  }
+
+  @Override
+  public void mark() {
+    logger.debug("Updating currentPosition to:{}", currentPosition);
+
+    indexBuffer.putLong(0, currentPosition);
+    indexBuffer.force();
+  }
+
+  @Override
+  public void reset() {
+    logger.debug("Rewinding last successful read position");
+
+    try {
+      walInputStream.getChannel().position(indexBuffer.getLong(0));
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public long getCurrentPosition() {
+    return currentPosition;
+  }
+
+  public long getMarkPosition() {
+    return indexBuffer.getLong(0);
+  }
+
+  public FileInputStream getWalInputStream() {
+    return walInputStream;
+  }
+
+  public void setWalInputStream(FileInputStream walInputStream) {
+    this.walInputStream = walInputStream;
+  }
+
+  public MappedByteBuffer getIndexBuffer() {
+    return indexBuffer;
+  }
+
+  public void setIndexBuffer(MappedByteBuffer indexBuffer) {
+    this.indexBuffer = indexBuffer;
+  }
+
+  public Decoder getDecoder() {
+    return decoder;
+  }
+
+  public void setDecoder(Decoder decoder) {
+    this.decoder = decoder;
+  }
+
+  public SpecificDatumReader<AvroWALEntry> getAvroReader() {
+    return avroReader;
+  }
+
+  public void setAvroReader(SpecificDatumReader<AvroWALEntry> avroReader) {
+    this.avroReader = avroReader;
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,129 @@
+package org.apache.wal.avro;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.wal.WALEntry;
+import org.apache.wal.WALWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class AvroWALWriter implements WALWriter {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(AvroWALWriter.class);
+
+  private FileOutputStream walOutputStream;
+  private MappedByteBuffer indexBuffer;
+  private Encoder encoder;
+  private SpecificDatumWriter<AvroWALEntry> writer;
+
+  private long currentPosition;
+  private FileChannel outputChannel;
+
+  @Override
+  public void open() {
+    outputChannel = walOutputStream.getChannel();
+
+    try {
+      indexBuffer.putLong(0, outputChannel.position());
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    indexBuffer.force();
+  }
+
+  @Override
+  public void write(WALEntry entry) {
+    Preconditions.checkArgument(entry instanceof AvroWALEntryAdapter,
+        "WAL entry must be an instance of AvroWALEntryAdapter");
+
+    try {
+      writer.write(((AvroWALEntryAdapter) entry).getEntry(), encoder);
+      encoder.flush();
+      outputChannel.force(false);
+      currentPosition = outputChannel.position();
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Wrote entry:{} markPosition:{} currentPosition:{}",
+            new Object[] { entry, indexBuffer.getLong(0), currentPosition });
+      }
+    } catch (IOException e) {
+      logger.error("Failed to write WAL entry. Exception follows.", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void mark() {
+    indexBuffer.putLong(0, currentPosition);
+    indexBuffer.force();
+  }
+
+  @Override
+  public void reset() {
+    try {
+      outputChannel.truncate(indexBuffer.getLong(0));
+      /*
+       * Changes to the file size affect the metadata so we need to force that
+       * out as well and pay the price of the second IO.
+       */
+      outputChannel.force(true);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public long getCurrentPosition() {
+    return currentPosition;
+  }
+
+  public long getMarkPosition() {
+    return indexBuffer.getLong(0);
+  }
+
+  public FileOutputStream getWalOutputStream() {
+    return walOutputStream;
+  }
+
+  public void setWalOutputStream(FileOutputStream walOutputStream) {
+    this.walOutputStream = walOutputStream;
+  }
+
+  public MappedByteBuffer getIndexBuffer() {
+    return indexBuffer;
+  }
+
+  public void setIndexBuffer(MappedByteBuffer indexBuffer) {
+    this.indexBuffer = indexBuffer;
+  }
+
+  public Encoder getEncoder() {
+    return encoder;
+  }
+
+  public void setEncoder(Encoder encoder) {
+    this.encoder = encoder;
+  }
+
+  public SpecificDatumWriter<AvroWALEntry> getWriter() {
+    return writer;
+  }
+
+  public void setWriter(SpecificDatumWriter<AvroWALEntry> writer) {
+    this.writer = writer;
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALReader.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,77 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.wal.WALEntry;
+import org.apache.wal.avro.AvroWALEntry;
+import org.apache.wal.avro.AvroWALEntryAdapter;
+import org.apache.wal.avro.AvroWALReader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+
+public class TestAvroWALReader {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestAvroWALReader.class);
+
+  private File testDirectory;
+  private AvroWALReader reader;
+
+  @Before
+  public void setUp() throws IOException {
+    testDirectory = new File("/tmp/wal-avro-" + System.currentTimeMillis());
+
+    testDirectory.mkdirs();
+
+    reader = new AvroWALReader();
+
+    FileInputStream walInputStream = new FileInputStream(new File(Resources
+        .getResource("test-wal").getFile()));
+
+    MappedByteBuffer indexBuffer = Files.map(new File(testDirectory, "index"),
+        FileChannel.MapMode.READ_WRITE, 8);
+    indexBuffer.putLong(0, 0);
+
+    reader.setDecoder(DecoderFactory.get().directBinaryDecoder(walInputStream,
+        null));
+    reader.setAvroReader(new SpecificDatumReader<AvroWALEntry>(
+        AvroWALEntry.SCHEMA$));
+    reader.setWalInputStream(walInputStream);
+    reader.setIndexBuffer(indexBuffer);
+  }
+
+  @SuppressWarnings("deprecation")
+  @After
+  public void tearDown() throws IOException {
+    Files.deleteRecursively(testDirectory.getCanonicalFile());
+  }
+
+  @Test
+  public void testNext() {
+    for (int i = 0; i < 184; i++) {
+      WALEntry entry = reader.next();
+
+      logger.debug("Read:{}", entry);
+
+      Assert.assertNotNull(entry);
+      Assert.assertTrue(entry instanceof AvroWALEntryAdapter);
+      Assert.assertNotNull(((AvroWALEntryAdapter) entry).getEntry());
+
+      reader.mark();
+    }
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,101 @@
+package org.apache.wal.avro;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import junit.framework.Assert;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.wal.avro.AvroWALEntry;
+import org.apache.wal.avro.AvroWALEntryAdapter;
+import org.apache.wal.avro.AvroWALWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+public class TestAvroWALWriter {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestAvroWALWriter.class);
+
+  private File testDirectory;
+  private AvroWALWriter writer;
+
+  @Before
+  public void setUp() throws IOException {
+    testDirectory = new File("/tmp/wal-avro-" + System.currentTimeMillis());
+
+    testDirectory.mkdirs();
+
+    MappedByteBuffer indexBuffer = Files.map(new File(testDirectory,
+        "write-index"), FileChannel.MapMode.READ_WRITE, 8);
+    FileOutputStream walOutputStream = new FileOutputStream(new File(
+        testDirectory, "wal"), true);
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(walOutputStream,
+        null);
+    SpecificDatumWriter<AvroWALEntry> avroWriter = new SpecificDatumWriter<AvroWALEntry>(
+        AvroWALEntry.class);
+
+    writer = new AvroWALWriter();
+
+    writer.setEncoder(encoder);
+    writer.setIndexBuffer(indexBuffer);
+    writer.setWalOutputStream(walOutputStream);
+    writer.setWriter(avroWriter);
+  }
+
+  @SuppressWarnings("deprecation")
+  @After
+  public void tearDown() throws IOException {
+    Files.deleteRecursively(testDirectory.getCanonicalFile());
+  }
+
+  @Test
+  public void testWrite() throws IOException {
+    BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(
+        new FileInputStream(new File(testDirectory, "wal")), null);
+    SpecificDatumReader<AvroWALEntry> reader = new SpecificDatumReader<AvroWALEntry>(
+        AvroWALEntry.class);
+
+    writer.open();
+
+    for (int i = 0; i < 205; i++) {
+      AvroWALEntryAdapter entry = new AvroWALEntryAdapter(new AvroWALEntry());
+
+      entry.getEntry().timeStamp = System.currentTimeMillis();
+
+      writer.write(entry);
+
+      if (i % 10 == 0) {
+        writer.reset();
+      } else {
+        writer.mark();
+      }
+
+      try {
+        AvroWALEntry lastEntry = reader.read(null, decoder);
+        logger.debug("read:{}", lastEntry);
+      } catch (EOFException e) {
+        Assert.assertTrue(i % 10 == 0);
+      }
+
+    }
+
+    writer.mark();
+    writer.close();
+  }
+}

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/log4j.properties?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/log4j.properties (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/log4j.properties Mon
Dec 19 06:36:58 2011
@@ -0,0 +1,7 @@
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.com.cloudera = DEBUG

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/test-wal
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/test-wal?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/test-wal (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/resources/test-wal Mon Dec 19
06:36:58 2011
@@ -0,0 +1 @@
+Þ㻩‰Mä㻩‰Mæ㻩‰Mê㻩‰Mî㻩‰Mô㻩‰Mú㻩‰Mþ㻩‰M‚仩‰MŒä»©‰M’仩‰M–仩‰Mšä»©‰Mžä»©‰Mªä»©‰M¬ä»©‰M²ä»©‰M´ä»©‰M¼ä»©‰MÀ仩‰MÈ仩‰MÌ仩‰MÐ仩‰MÔ仩‰MÚ仩‰Mà仩‰Mä仩‰Mô仩‰Mú仩‰Mˆå»©‰M”廩‰Mšå»©‰Mžå»©‰M¤å»©‰M¦å»©‰Mªå»©‰M´å»©‰M¸å»©‰M¼å»©‰MÀ廩‰Mâ廩‰Mô廩‰Mú廩‰M€æ»©‰Mˆæ»©‰M’滩‰Mšæ»©‰M¢æ»©‰M¨æ»©‰MÆ滩‰MÒ滩‰MÖ滩‰MÜ滩‰Mâ滩‰Mê滩‰Mð滩‰Mö滩‰Mü滩‰M‚绩‰Mˆç»©‰MŒç»©‰M’绩‰M˜ç»©‰Mžç»©‰M¤ç»©‰M¨ç»©‰M®ç»©‰M°ç»©‰M´ç»©‰M¸ç»©‰M¼ç»©‰MÂ绩‰MÆ绩‰MÌ绩‰MÎ绩‰MÒ绩‰MÔ绩‰MØ绩‰MÜ绩‰Mà绩‰Mä绩‰Mê绩‰Mî绩‰Mò绩‰Mö绩‰Mú绩‰Mþ绩‰M„軩‰Mˆè»©‰MŒè»©‰
 M”軩‰M˜è»©‰Mœè»©‰M è»©‰M¤è»©‰M¬è»©‰M°è»©‰M´è»©‰M¸è»©‰M¾è»©‰MÂ軩‰MÈ軩‰MÌ軩‰MÐ軩‰MÔ軩‰MØ軩‰Mà軩‰Mä軩‰Mê軩‰Mô軩‰Mø軩‰Mþ軩‰M‚黩‰Mˆé»©‰MŠé»©‰MŽé»©‰M¨é»©‰M²é»©‰M¼é»©‰MÆ黩‰M¤ë»©‰M°ë»©‰M¶ë»©‰Mºë»©‰MÄ뻩‰MÈ뻩‰MÐ뻩‰Mæ뻩‰Mê뻩‰Mò뻩‰Mö뻩‰M€ì»©‰M„컩‰Mˆì»©‰MŒì»©‰M”컩‰Mšì»©‰Mžì»©‰M¢ì»©‰M¦ì»©‰M®ì»©‰M²ì»©‰M¸ì»©‰M¾ì»©‰MÄ컩‰MÊ컩‰MÎ컩‰MÔ컩‰MÜ컩‰Mî컩‰Mþ컩‰M†í»©‰MŠí»©‰M”í»©‰M˜í»©‰Mží»©‰M¦í»©‰Mªí»©‰M®í»©‰M²í»©‰M´í»©‰Mºí»©‰M¾í»©‰MÄí»©‰MÆí»©‰MÊí»©‰MÐí»©‰MÔí»©‰MÖí»©‰MÚí»©‰MÜí»©‰Mâí»©‰Mæí»©‰Mèí»©‰Mìí»©‰Mðí»©‰Môí»©‰Múí»©‰Müí»©‰M‚
 Mˆî»©‰MŠî»©‰MŽî»©‰M’‰M
\ No newline at end of file

Added: incubator/flume/branches/flume-897/wal/wal-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/pom.xml?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/pom.xml (added)
+++ incubator/flume/branches/flume-897/wal/wal-core/pom.xml Mon Dec 19 06:36:58 2011
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>wal</artifactId>
+    <groupId>org.apache.wal</groupId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.wal</groupId>
+  <artifactId>wal-core</artifactId>
+  <name>Write ahead log core</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file

Added: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WAL.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WAL.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WAL.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,17 @@
+package org.apache.wal;
+
+import java.util.Map;
+
+public interface WAL {
+
+  public void configure(Map<String, String> properties);
+
+  public void open();
+
+  public void close();
+
+  public WALReader getReader();
+
+  public WALWriter getWriter();
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALEntry.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALEntry.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALEntry.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALEntry.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,5 @@
+package org.apache.wal;
+
+public interface WALEntry {
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,11 @@
+package org.apache.wal;
+
+public interface WALReader {
+
+  public WALEntry next();
+
+  public void mark();
+
+  public void reset();
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALWriter.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALWriter.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALWriter.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,15 @@
+package org.apache.wal;
+
+public interface WALWriter {
+
+  public void open();
+
+  public void write(WALEntry entry);
+
+  public void close();
+
+  public void mark();
+
+  public void reset();
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
(added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
Mon Dec 19 06:36:58 2011
@@ -0,0 +1,178 @@
+package org.apache.wal;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.wal.TestWAL.MockWAL.MockWALEntry;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestWAL {
+
+  private static final Logger logger = LoggerFactory.getLogger(TestWAL.class);
+
+  private WAL wal;
+
+  @Before
+  public void setUp() {
+    wal = new MockWAL();
+  }
+
+  @Test
+  public void testLifecycle() {
+    Map<String, String> properties = new HashMap<String, String>();
+
+    wal.configure(properties);
+    wal.open();
+
+    WALReader reader = wal.getReader();
+    WALWriter writer = wal.getWriter();
+
+    Assert.assertNotNull(reader);
+    Assert.assertNotNull(writer);
+
+    WALEntry e1 = new MockWALEntry();
+
+    writer.open();
+    writer.write(e1);
+    Assert.assertNull(reader.next());
+
+    writer.mark();
+    writer.close();
+
+    WALEntry e2 = reader.next();
+
+    Assert.assertEquals(e1, e2);
+    Assert.assertNull(reader.next());
+
+    wal.close();
+  }
+
+  public static class MockWAL implements WAL {
+
+    private LinkedList<WALEntry> queue;
+
+    public MockWAL() {
+      queue = new LinkedList<WALEntry>();
+    }
+
+    @Override
+    public void configure(Map<String, String> properties) {
+      logger.debug("configure wal - properties:{}", properties);
+    }
+
+    @Override
+    public void open() {
+      logger.debug("open wal");
+    }
+
+    @Override
+    public void close() {
+      logger.debug("close wal");
+    }
+
+    @Override
+    public WALReader getReader() {
+      MockWALReader reader = new MockWALReader();
+
+      reader.wal = this;
+
+      return reader;
+    }
+
+    @Override
+    public WALWriter getWriter() {
+      MockWALWriter writer = new MockWALWriter();
+
+      writer.wal = this;
+
+      return writer;
+    }
+
+    public void write(Collection<WALEntry> entries) {
+      queue.addAll(entries);
+    }
+
+    public WALEntry next() {
+      if (!queue.isEmpty()) {
+        return queue.pop();
+      }
+
+      return null;
+    }
+
+    public static class MockWALReader implements WALReader {
+
+      private MockWAL wal;
+
+      @Override
+      public WALEntry next() {
+        return wal.next();
+      }
+
+      @Override
+      public void mark() {
+
+      }
+
+      @Override
+      public void reset() {
+
+      }
+
+    }
+
+    public static class MockWALWriter implements WALWriter {
+
+      private MockWAL wal;
+      private LinkedList<WALEntry> queue;
+
+      public MockWALWriter() {
+        queue = new LinkedList<WALEntry>();
+      }
+
+      @Override
+      public void open() {
+        logger.debug("opening writer");
+      }
+
+      @Override
+      public void write(WALEntry entry) {
+        logger.debug("writing entry:{}", entry);
+
+        queue.add(entry);
+      }
+
+      @Override
+      public void mark() {
+        logger.debug("synchronizing writer - {} elements", queue.size());
+
+        wal.write(queue);
+
+        queue.clear();
+      }
+
+      @Override
+      public void reset() {
+
+      }
+
+      @Override
+      public void close() {
+        logger.debug("closing writer");
+      }
+
+    }
+
+    public static class MockWALEntry implements WALEntry {
+
+    }
+
+  }
+
+}

Added: incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/log4j.properties?rev=1220620&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/log4j.properties (added)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/test/resources/log4j.properties Mon
Dec 19 06:36:58 2011
@@ -0,0 +1,7 @@
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.com.cloudera = DEBUG



Mime
View raw message