flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1308168 [1/2] - in /incubator/flume/trunk: ./ flume-ng-channels/ flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-recoverable-memory-channel/ flume-ng-channels/flume-recoverable-memory-channel/src/ flume-ng-channels/flume-reco...
Date Sun, 01 Apr 2012 18:17:32 GMT
Author: arvind
Date: Sun Apr  1 18:17:31 2012
New Revision: 1308168

URL: http://svn.apache.org/viewvc?rev=1308168&view=rev
Log:
FLUME-896. Implement a Durable Memory Channel.

(Brock Noland via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java   (with props)
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
    incubator/flume/trunk/flume-ng-channels/pom.xml
    incubator/flume/trunk/flume-ng-dist/pom.xml
    incubator/flume/trunk/pom.xml

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml?rev=1308168&r1=1308167&r2=1308168&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml Sun Apr  1 18:17:31 2012
@@ -69,6 +69,45 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+    </dependency>
+
   </dependencies>
 
+  <profiles>
+
+    <profile>
+      <id>hadoop-0.23</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>23</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jcl</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>commons-configuration</groupId>
+          <artifactId>commons-configuration</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+
+  </profiles>
+
+
+
 </project>

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sun Apr  1 18:17:31 2012
@@ -0,0 +1,4 @@
+.classpath
+.project
+.settings
+target

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/pom.xml?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/pom.xml (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/pom.xml Sun Apr  1 18:17:31 2012
@@ -0,0 +1,113 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-ng-channels</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-channels</groupId>
+  <artifactId>flume-recoverable-memory-channel</artifactId>
+  <name>Flume NG file backed Memory channel</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+    </dependency>
+
+  </dependencies>
+
+  <profiles>
+
+    <profile>
+      <id>hadoop-0.23</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>23</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jcl</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>commons-configuration</groupId>
+          <artifactId>commons-configuration</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+
+  </profiles>
+
+
+
+</project>

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.recoverable.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicChannelSemantics;
+import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.recoverable.memory.wal.WAL;
+import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
+import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * <p>
+ * A durable {@link Channel} implementation that uses the local file system for
+ * its storage.
+ * </p>
+ */
+public class RecoverableMemoryChannel extends BasicChannelSemantics {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(RecoverableMemoryChannel.class);
+
+
+  public static final String WAL_DATA_DIR = "wal.dataDir";
+  public static final String WAL_ROLL_SIZE = "wal.rollSize";
+  public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
+  public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
+  public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
+
+  private MemoryChannel memoryChannel = new MemoryChannel();
+  private AtomicLong seqidGenerator = new AtomicLong(0);
+  private WAL<RecoverableMemoryChannelEvent> wal;
+
+  @Override
+  public void configure(Context context) {
+    memoryChannel.configure(context);
+
+    String homePath = System.getProperty("user.home").replace('\\', '/');
+    String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
+    if(wal != null) {
+      try {
+        wal.close();
+      } catch (IOException e) {
+        LOG.error("Error closing existing wal during reconfigure", e);
+      }
+    }
+    long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
+    long maxLogsSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
+    long minRententionPeriod = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_MIN_LOG_RENTENTION_PERIOD);
+    long workerInterval = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_WORKER_INTERVAL);
+    try {
+      wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
+          RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
+          minRententionPeriod, workerInterval);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public synchronized void start() {
+    try {
+      WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
+      Preconditions.checkArgument(results.getSequenceID() >= 0);
+      LOG.info("Replay SequenceID " + results.getSequenceID());
+      seqidGenerator.set(results.getSequenceID());
+      Transaction transaction = memoryChannel.getTransaction();
+      transaction.begin();
+      LOG.info("Replay Events " + results.getResults().size());
+      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
+        memoryChannel.put(entry.getData());
+        seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
+      }
+      transaction.commit();
+      transaction.close();
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    try {
+      close();
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+    super.stop();
+  }
+
+  @Override
+  protected BasicTransactionSemantics createTransaction() {
+    return new FileBackedTransaction(this, memoryChannel);
+  }
+
+  private void commitEvents(List<RecoverableMemoryChannelEvent> events)
+      throws IOException {
+    List<WALEntry<RecoverableMemoryChannelEvent>> entries = Lists.newArrayList();
+    for(RecoverableMemoryChannelEvent event : events) {
+      entries.add(new WALEntry<RecoverableMemoryChannelEvent>(event, event.sequenceId));
+    }
+    wal.writeEntries(entries);
+  }
+  private void commitSequenceID(List<Long> seqids)
+      throws IOException {
+    wal.writeSequenceIDs(seqids);
+  }
+
+  private long nextSequenceID() {
+    return seqidGenerator.incrementAndGet();
+  }
+
+  void close() throws IOException {
+    if(wal != null) {
+      wal.close();
+    }
+  }
+
+  /**
+   * <p>
+   * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
+   * </p>
+   */
+  private static class FileBackedTransaction extends BasicTransactionSemantics {
+
+    private Transaction transaction;
+    private MemoryChannel memoryChannel;
+    private RecoverableMemoryChannel fileChannel;
+    private List<Long> sequenceIds = Lists.newArrayList();
+    private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
+    private FileBackedTransaction(RecoverableMemoryChannel fileChannel, MemoryChannel memoryChannel) {
+      this.fileChannel = fileChannel;
+      this.memoryChannel = memoryChannel;
+      this.transaction = this.memoryChannel.getTransaction();
+    }
+    @Override
+    protected void doBegin() throws InterruptedException {
+      transaction.begin();
+    }
+    @Override
+    protected void doPut(Event event) throws InterruptedException {
+      RecoverableMemoryChannelEvent sequencedEvent = new RecoverableMemoryChannelEvent(event, fileChannel.nextSequenceID());
+      memoryChannel.put(sequencedEvent);
+      events.add(sequencedEvent);
+    }
+
+    @Override
+    protected Event doTake() throws InterruptedException {
+      RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
+      if(event != null) {
+        sequenceIds.add(event.sequenceId);
+        return event.event;
+      }
+      return null;
+    }
+
+    @Override
+    protected void doCommit() throws InterruptedException {
+      if(sequenceIds.size() > 0) {
+        try {
+          fileChannel.commitSequenceID(sequenceIds);
+        } catch (IOException e) {
+          throw new ChannelException("Unable to commit", e);
+        }
+      }
+      if(!events.isEmpty()) {
+        try {
+          fileChannel.commitEvents(events);
+        } catch (IOException e) {
+          throw new ChannelException("Unable to commit", e);
+        }
+      }
+      transaction.commit();
+    }
+
+    @Override
+    protected void doRollback() throws InterruptedException {
+      sequenceIds.clear();
+      events.clear();
+      transaction.rollback();
+    }
+
+    @Override
+    protected void doClose() {
+      sequenceIds.clear();
+      events.clear();
+      transaction.close();
+    }
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+class RecoverableMemoryChannelEvent implements Event, Writable {
+  Event event;
+  long sequenceId;
+
+  // called via reflection
+  @SuppressWarnings("unused")
+  private RecoverableMemoryChannelEvent() {
+    this.event = new SimpleEvent();
+  }
+
+  RecoverableMemoryChannelEvent(Event event, long sequenceId) {
+    this.event = event;
+    this.sequenceId = sequenceId;
+  }
+  @Override
+  public Map<String, String> getHeaders() {
+    return event.getHeaders();
+  }
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    event.setHeaders(headers);
+  }
+  @Override
+  public byte[] getBody() {
+    return event.getBody();
+  }
+  @Override
+  public void setBody(byte[] body) {
+    event.setBody(body);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(sequenceId);
+    MapWritable map = toMapWritable(getHeaders());
+    map.write(out);
+    byte[] body = getBody();
+    if(body == null) {
+      out.writeInt(-1);
+    } else {
+      out.writeInt(body.length);
+      out.write(body);
+    }
+  }
+
+
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    sequenceId = in.readLong();
+    MapWritable map = new MapWritable();
+    map.readFields(in);
+    setHeaders(fromMapWritable(map));
+    byte[] body = null;
+    int bodyLength = in.readInt();
+    if(bodyLength != -1) {
+      body = new byte[bodyLength];
+      in.readFully(body);
+    }
+    setBody(body);
+  }
+  private MapWritable toMapWritable(Map<String, String> map) {
+    MapWritable result = new MapWritable();
+    if(map != null) {
+      for(Map.Entry<String, String> entry : map.entrySet()) {
+        result.put(new Text(entry.getKey()),new Text(entry.getValue()));
+      }
+    }
+    return result;
+  }
+  private Map<String, String> fromMapWritable(MapWritable map) {
+    Map<String, String> result = Maps.newHashMap();
+    if(map != null) {
+      for(Map.Entry<Writable, Writable> entry : map.entrySet()) {
+        result.put(entry.getKey().toString(),entry.getValue().toString());
+      }
+    }
+    return result;
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannelEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.List;
+
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+public class SequenceIDBuffer implements IndexedSortable {
+  private static final Logger LOG = LoggerFactory.getLogger(SequenceIDBuffer.class);
+  private LongBuffer buffer;
+  private ByteBuffer backingBuffer;
+
+  public SequenceIDBuffer(int size) {
+    long directMemorySize = getDirectMemorySize();
+    int bytesRequired = size * 8;
+    if((long)bytesRequired > directMemorySize) {
+      LOG.warn("DirectMemorySize is " + directMemorySize +
+          " and we require " + bytesRequired + ", allocate will likely faily.");
+    }
+    try {
+      backingBuffer = ByteBuffer.allocateDirect(bytesRequired);
+    } catch(OutOfMemoryError e) {
+      LOG.error("DirectMemorySize is " + directMemorySize +
+          " and we required " + bytesRequired +
+          " via: -XX:MaxDirectMemorySize", e);
+      throw e;
+    }
+    buffer = backingBuffer.asLongBuffer();
+  }
+
+  @Override
+  public int compare(int leftIndex, int rightIndex) {
+    long left = get(leftIndex);
+    long right = get(rightIndex);
+    return (left < right ? -1 : (left == right ? 0 : 1));
+
+  }
+
+  public boolean exists(long value) {
+    return binarySearch(value) >= 0;
+  }
+
+  private int binarySearch(long value) {
+    int low = 0;
+    int high = size() - 1;
+
+    while (low <= high) {
+      int mid = (low + high) >>> 1;
+      long midVal = get(mid);
+
+      if (midVal < value) {
+        low = mid + 1;
+      } else if (midVal > value) {
+        high = mid - 1;
+      } else {
+        return mid; // key found
+      }
+    }
+    return -(low + 1); // key not found.
+  }
+
+  @Override
+  public void swap(int leftIndex, int rightIndex) {
+    long left = get(leftIndex);
+    long right = get(rightIndex);
+    put(leftIndex, right);
+    put(rightIndex, left);
+  }
+
+  public long get(int index) {
+    return buffer.get(index);
+  }
+
+  public void put(int index, long value) {
+    buffer.put(index, value);
+  }
+
+  public int size() {
+    return buffer.limit();
+  }
+
+  public void close() {
+    try {
+      Preconditions.checkArgument(backingBuffer.isDirect(),
+          "buffer isn't direct!");
+      Method cleanerMethod = backingBuffer.getClass().getMethod("cleaner");
+      cleanerMethod.setAccessible(true);
+      Object cleaner = cleanerMethod.invoke(backingBuffer);
+      Method cleanMethod = cleaner.getClass().getMethod("clean");
+      cleanMethod.setAccessible(true);
+      cleanMethod.invoke(cleaner);
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  public void sort() {
+    QuickSort quickSort = new QuickSort();
+    quickSort.sort(this, 0, size());
+  }
+
+  /**
+   * @return the setting of -XX:MaxDirectMemorySize as a long. Returns 0 if
+   *         -XX:MaxDirectMemorySize is not set.
+   */
+
+  private static long getDirectMemorySize() {
+    RuntimeMXBean RuntimemxBean = ManagementFactory.getRuntimeMXBean();
+    List<String> arguments = RuntimemxBean.getInputArguments();
+    long multiplier = 1; //for the byte case.
+    for (String s : arguments) {
+      if (s.contains("-XX:MaxDirectMemorySize=")) {
+        String memSize = s.toLowerCase()
+            .replace("-xx:maxdirectmemorysize=", "").trim();
+
+        if (memSize.contains("k")) {
+          multiplier = 1024;
+        }
+
+        else if (memSize.contains("m")) {
+          multiplier = 1048576;
+        }
+
+        else if (memSize.contains("g")) {
+          multiplier = 1073741824;
+        }
+        memSize = memSize.replaceAll("[^\\d]", "");
+
+        long retValue = Long.parseLong(memSize);
+        return retValue * multiplier;
+      }
+
+    }
+    return 0;
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    try {
+      System.out.println("SequenceIDBuffer");
+      SequenceIDBuffer buffer = new SequenceIDBuffer(13107200);
+      buffer.close();
+      System.out.println("Array");
+      @SuppressWarnings("unused")
+      long[] array = new long[13107200];
+    } catch (Throwable t) {
+      t.printStackTrace();
+    } finally {
+      Thread.sleep(Long.MAX_VALUE);
+    }
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
+/**
+ * Provides Write Ahead Log functionality for a generic Writable. All entries
+ * stored in the WAL must be assigned a unique increasing sequence id. WAL
+ * files will be removed when the following condition holds (defaults):
+ * 
+ * At least 512MB of WAL's exist, the file in question is greater than
+ * five minutes old and the largest committed sequence id is greater
+ * than the largest sequence id in the file.
+ * 
+ * <pre>
+ *  WAL wal = new WAL(path, Writable.class);
+ *  wal.writeEvent(event, 1);
+ *  wal.writeEvent(event, 2);
+ *  wal.writeSequenceID(1);
+ *  wal.writeEvent(event, 3);
+ * 
+ *  System crashes or shuts down...
+ * 
+ *  WAL wal = new WAL(path, Writable.class);
+ *  [Event 2, Event 3]  = wal.replay();
+ * </pre>
+ * 
+ * WAL files will be created in the specified data directory. They will be
+ * rolled at 64MB and deleted five minutes after they are no longer needed.
+ * that is the current sequence id) is greater than the greatest sequence id
+ *  in the file.
+ * 
+ * The only synchronization this class does is around rolling log files. When
+ * a roll of the log file is required, the thread which discovers this
+ * will execute the roll. Any threads calling a write*() method during
+ * the roll will block until the roll is complete.
+ */
+public class WAL<T extends Writable> implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(WAL.class);
+
+  private File path;
+  private File dataPath;
+  private File sequenceIDPath;
+  private Class<T> clazz;
+  private WALDataFile.Writer<T> dataFileWALWriter;
+  private WALDataFile.Writer<NullWritable> sequenceIDWALWriter;
+  private Map<String, Long> fileLargestSequenceIDMap = Collections
+      .synchronizedMap(new HashMap<String, Long>());
+  private AtomicLong largestCommitedSequenceID = new AtomicLong(0);
+  private volatile boolean rollRequired;
+  private volatile boolean rollInProgress;
+  private long rollSize;
+  private long maxLogsSize;
+  private long minLogRentionPeriod;
+  private long workerInterval;
+  private int numReplaySequenceIDOverride;
+  private Worker backgroundWorker;
+
+  /**
+   * Number of bytes before we roll the file.
+   */
+  public static final long DEFAULT_ROLL_SIZE = 1024L * 1024L * 64L;
+  /**
+   * Number of bytes, to keep before we start pruning logs.
+   */
+  public static final long DEFAULT_MAX_LOGS_SIZE = 1024L * 1024L * 512L;
+  /**
+   * Minimum number of ms to keep a log file.
+   */
+  public static final long DEFAULT_MIN_LOG_RENTENTION_PERIOD = 5L * 60L * 1000L;
+  /**
+   * How often in ms the background worker runs
+   */
+  public static final long DEFAULT_WORKER_INTERVAL = 60L * 1000L;
+
+  // used for testing only
+  WAL(File path, Class<T> clazz) throws IOException {
+    this(path, clazz, DEFAULT_ROLL_SIZE, DEFAULT_MAX_LOGS_SIZE,
+        DEFAULT_MIN_LOG_RENTENTION_PERIOD, DEFAULT_WORKER_INTERVAL);
+  }
+
+  /**
+   * Creates a wal object with no defaults, using the specified parameters in
+   * the constructor for operation.
+   * 
+   * @param path
+   * @param clazz
+   * @param rollSize
+   *          bytes - max size of a single file before we roll
+   * @param isRollRequiredInterval
+   *          ms - how often we check the size of a log
+   * @param maxLogsSize
+   *          bytes - total amount of logs to keep excluding the current log
+   * @param minLogRentionPeriod
+   *          ms - minimum amount of time to keep a log
+   * @param workerInterval
+   * @throws IOException
+   */
+  public WAL(File path, Class<T> clazz, long rollSize,
+      long maxLogsSize, long minLogRentionPeriod,
+      long workerInterval) throws IOException {
+    this.path = path;
+    this.rollSize = rollSize;
+    this.maxLogsSize = maxLogsSize;
+    this.minLogRentionPeriod = minLogRentionPeriod;
+    this.workerInterval = workerInterval;
+
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("path = ").append(path).append(", ");
+    buffer.append("rollSize = ").append(rollSize).append(", ");
+    buffer.append("maxLogsSize = ").append(maxLogsSize).append(", ");
+    buffer.append("minLogRentionPeriod = ").append(minLogRentionPeriod).append(", ");
+    buffer.append("workerInterval = ").append(workerInterval).append("\n");
+    LOG.info("WAL Parameters: " + buffer);
+
+    File clazzNamePath = new File(path, "clazz");
+    createOrDie(path);
+    if (clazzNamePath.exists()) {
+      String clazzName = Files.readFirstLine(clazzNamePath, Charsets.UTF_8);
+      if (!clazzName.equals(clazz.getName())) {
+        throw new IOException("WAL is for " + clazzName
+            + " and you are passing " + clazz.getName());
+      }
+    } else {
+      Files.write(clazz.getName().getBytes(Charsets.UTF_8), clazzNamePath);
+    }
+
+    dataPath = new File(path, "data");
+    sequenceIDPath = new File(path, "seq");
+    createOrDie(dataPath);
+    createOrDie(sequenceIDPath);
+    this.clazz = clazz;
+
+    backgroundWorker = new Worker(this.workerInterval, this.maxLogsSize,
+        this.minLogRentionPeriod, largestCommitedSequenceID,
+        fileLargestSequenceIDMap);
+    backgroundWorker.setName("WAL-Worker-" + path.getAbsolutePath());
+    backgroundWorker.setDaemon(true);
+    backgroundWorker.start();
+
+    roll();
+  }
+
+  private void roll() throws IOException {
+    try {
+      rollInProgress = true;
+      LOG.info("Rolling WAL " + this.path);
+      if (dataFileWALWriter != null) {
+        fileLargestSequenceIDMap.put(dataFileWALWriter.getPath()
+            .getAbsolutePath(), dataFileWALWriter.getLargestSequenceID());
+        dataFileWALWriter.close();
+      }
+      if (sequenceIDWALWriter != null) {
+        fileLargestSequenceIDMap.put(sequenceIDWALWriter.getPath()
+            .getAbsolutePath(), sequenceIDWALWriter.getLargestSequenceID());
+        sequenceIDWALWriter.close();
+      }
+      long ts = System.currentTimeMillis();
+      File dataWalFileName = new File(dataPath, Long.toString(ts));
+      File seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
+      while (dataWalFileName.exists() || seqWalFileName.exists()) {
+        ts++;
+        dataWalFileName = new File(dataPath, Long.toString(ts));
+        seqWalFileName = new File(sequenceIDPath, Long.toString(ts));
+      }
+
+      dataFileWALWriter = new WALDataFile.Writer<T>(dataWalFileName);
+      sequenceIDWALWriter = new WALDataFile.Writer<NullWritable>(seqWalFileName);
+      rollRequired = false;
+    } finally {
+      rollInProgress = false;
+      // already have lock but is more clear
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+  }
+
+  public WALReplayResult<T> replay() throws IOException {
+    final AtomicLong sequenceID = new AtomicLong(0);
+    final Map<String, Long> fileLargestSequenceIDMap = Maps.newHashMap();
+    final AtomicLong totalBytes = new AtomicLong(0);
+    // first get the total amount of data we have to read in
+    readFiles(sequenceIDPath, new Function<File, Void>() {
+      @Override
+      public Void apply(File input) {
+        totalBytes.addAndGet(input.length());
+        return null;
+      }
+    });
+
+    // then estimate the size of the array
+    // needed to hold all the sequence ids
+    int baseSize = WALEntry.getBaseSize();
+    int numEntries = Math.max((int)((totalBytes.get() / baseSize) * 1.05f) + 1,
+        numReplaySequenceIDOverride);
+    LOG.info("Replay assumptions: baseSize = " + baseSize
+        + ", estimatedNumEntries " + numEntries);
+    final SequenceIDBuffer sequenceIDs = new SequenceIDBuffer(numEntries);
+
+    // read them all into ram
+    final AtomicInteger index = new AtomicInteger(0);
+    readFiles(sequenceIDPath, new Function<File, Void>() {
+      @Override
+      public Void apply(File input) {
+        WALDataFile.Reader<NullWritable> reader = null;
+        int localIndex = index.get();
+        try {
+          // item stored is a NullWritable so we only store the base WALEntry
+          reader = new WALDataFile.Reader<NullWritable>(input,
+              NullWritable.class);
+          List<WALEntry<NullWritable>> batch;
+          long largestForFile = Long.MIN_VALUE;
+          while ((batch = reader.nextBatch()) != null) {
+            for(WALEntry<NullWritable> entry : batch) {
+              long current = entry.getSequenceID();
+              sequenceIDs.put(localIndex++, current);
+              largestForFile = Math.max(largestForFile, current);
+            }
+          }
+          sequenceID.set(Math.max(largestForFile, sequenceID.get()));
+          fileLargestSequenceIDMap.put(input.getAbsolutePath(),
+              largestForFile);
+        } catch (IOException e) {
+          Throwables.propagate(e);
+        } finally {
+          index.set(localIndex);
+          if (reader != null) {
+            try {
+              reader.close();
+            } catch (IOException e) {
+            }
+          }
+        }
+        return null;
+      }
+    });
+
+    sequenceIDs.sort();
+
+    // now read all edits storing items with a sequence id
+    // which is *not* in the sequenceIDs
+    final List<WALEntry<T>> entries = Lists.newArrayList();
+    final Class<T> dataClazz = clazz;
+    readFiles(dataPath, new Function<File, Void>() {
+      @Override
+      public Void apply(File input) {
+        WALDataFile.Reader<T> reader = null;
+        try {
+          reader = new WALDataFile.Reader<T>(input, dataClazz);
+          List<WALEntry<T>> batch = Lists.newArrayList();
+          long largestForFile = Long.MIN_VALUE;
+          while ((batch = reader.nextBatch()) != null) {
+            for(WALEntry<T> entry : batch) {
+              long current = entry.getSequenceID();
+              if (!sequenceIDs.exists(current)) {
+                entries.add(entry);
+              }
+              largestForFile = Math.max(largestForFile, current);
+            }
+          }
+          sequenceID.set(Math.max(largestForFile, sequenceID.get()));
+          fileLargestSequenceIDMap.put(input.getAbsolutePath(),
+              largestForFile);
+        } catch (IOException e) {
+          Throwables.propagate(e);
+        } finally {
+          if (reader != null) {
+            try {
+              reader.close();
+            } catch (IOException e) {
+            }
+          }
+        }
+        return null;
+      }
+    });
+    sequenceIDs.close();
+    synchronized (this.fileLargestSequenceIDMap) {
+      this.fileLargestSequenceIDMap.clear();
+      this.fileLargestSequenceIDMap.putAll(fileLargestSequenceIDMap);
+    }
+    largestCommitedSequenceID.set(sequenceID.get());
+    return new WALReplayResult<T>(entries, largestCommitedSequenceID.get());
+  }
+
+  public void writeEntries(List<WALEntry<T>> entries) throws IOException {
+    Preconditions.checkNotNull(dataFileWALWriter,
+        "Write is null, close must have been called");
+    synchronized (this) {
+      if (isRollRequired()) {
+        roll();
+      }
+    }
+    waitWhileRolling();
+    boolean error = true;
+    try {
+      dataFileWALWriter.append(entries);
+      error = false;
+    } finally {
+      if (error) {
+        rollRequired = true;
+      }
+    }
+  }
+
+  public void writeEntry(WALEntry<T> entry) throws IOException {
+    List<WALEntry<T>> entries = Lists.newArrayList();
+    entries.add(entry);
+    writeEntries(entries);
+  }
+
+  public void writeSequenceID(long sequenceID) throws IOException {
+    List<Long> sequenceIDs = Lists.newArrayList();
+    sequenceIDs.add(sequenceID);
+    writeSequenceIDs(sequenceIDs);
+  }
+  public void writeSequenceIDs(List<Long> sequenceIDs) throws IOException {
+    Preconditions.checkNotNull(sequenceIDWALWriter,
+        "Write is null, close must have been called");
+    synchronized (this) {
+      if (isRollRequired()) {
+        roll();
+      }
+    }
+    waitWhileRolling();
+    boolean error = true;
+    try {
+      List<WALEntry<NullWritable>> entries = Lists.newArrayList();
+      for(Long sequenceID : sequenceIDs) {
+      largestCommitedSequenceID.set(Math.max(sequenceID,
+          largestCommitedSequenceID.get()));
+      entries.add(new WALEntry<NullWritable>(NullWritable.get(), sequenceID));
+      sequenceIDWALWriter.append(entries);
+      }
+      error = false;
+    } finally {
+      if (error) {
+        rollRequired = true;
+      }
+    }
+  }
+
+  private void waitWhileRolling() {
+    synchronized (this) {
+      while (rollInProgress) {
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (backgroundWorker != null) {
+      backgroundWorker.shutdown();
+    }
+    if (sequenceIDWALWriter != null) {
+      sequenceIDWALWriter.close();
+      sequenceIDWALWriter = null;
+    }
+    if (dataFileWALWriter != null) {
+      dataFileWALWriter.close();
+      dataFileWALWriter = null;
+    }
+  }
+
+  private boolean isRollRequired() throws IOException {
+    if (rollRequired) {
+      return true;
+    }
+    return Math.max(dataFileWALWriter.getSize(), sequenceIDWALWriter.getSize()) > rollSize;
+  }
+
+  private void readFiles(File path, Function<File, Void> function)
+      throws IOException {
+    File[] dataFiles = path.listFiles();
+    List<File> files = Lists.newArrayList();
+    if (dataFiles != null) {
+      for (File dataFile : dataFiles) {
+        if (!dataFile.isFile()) {
+          throw new IOException("Not file " + dataFile);
+        }
+        files.add(dataFile);
+      }
+    }
+    for (File dataFile : files) {
+      function.apply(dataFile);
+    }
+  }
+
+  private void createOrDie(File path) throws IOException {
+    if (!path.isDirectory()) {
+      if (!path.mkdirs()) {
+        throw new IOException("Unable to create " + path);
+      }
+    }
+  }
+
+  private static class Worker extends Thread {
+    private long workerInterval;
+    private long maxLogsSize;
+    private long minLogRentionPeriod;
+    private AtomicLong largestSequenceID;
+    private Map<String, Long> fileLargestSequenceIDMap;
+    private volatile boolean run = true;
+
+    public Worker(long workerInterval, long maxLogsSize,
+        long minLogRentionPeriod, AtomicLong largestSequenceID,
+        Map<String, Long> fileLargestSequenceIDMap) {
+      this.workerInterval = workerInterval;
+      this.largestSequenceID = largestSequenceID;
+      this.fileLargestSequenceIDMap = fileLargestSequenceIDMap;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Background worker reporting for duty");
+      while (run) {
+        try {
+          try {
+            Thread.sleep(workerInterval);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          if (!run) {
+            continue;
+          }
+          List<String> filesToRemove = Lists.newArrayList();
+          long totalSize = 0;
+          synchronized (fileLargestSequenceIDMap) {
+            for (String key : fileLargestSequenceIDMap.keySet()) {
+              File file = new File(key);
+              totalSize += file.length();
+            }
+            if (totalSize >= maxLogsSize) {
+              for (String key : fileLargestSequenceIDMap.keySet()) {
+                File file = new File(key);
+                Long seqid = fileLargestSequenceIDMap.get(key);
+                long largestCommitedSeqID = largestSequenceID.get();
+                if (file.exists()
+                    // has not been modified in 5 minutes
+                    && System.currentTimeMillis() - file.lastModified() > minLogRentionPeriod
+                    // current seqid is greater than the largest seqid in the file
+                    && largestCommitedSeqID > seqid) {
+                  filesToRemove.add(key);
+                  LOG.info("Removing expired file " + key + ", seqid = "
+                      + seqid + ", result = " + file.delete());
+                }
+              }
+              for (String key : filesToRemove) {
+                fileLargestSequenceIDMap.remove(key);
+              }
+            }
+          }
+        } catch (Exception ex) {
+          LOG.error("Uncaught exception in background worker", ex);
+        }
+      }
+      LOG.warn(this.getClass().getSimpleName()
+          + " moving on due to stop request");
+    }
+
+    public void shutdown() {
+      run = false;
+      this.interrupt();
+    }
+  }
+
+  /**
+   * Reads in a WAL and writes out a new WAL. Used if for some reason a replay
+   * cannot occur due to the size of the WAL or assumptions about the number of
+   * sequenceids.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static void main(String[] args) throws IOException,
+  ClassNotFoundException {
+    Preconditions.checkPositionIndex(0, args.length,
+        "input directory is a required arg");
+    Preconditions.checkPositionIndex(1, args.length,
+        "output directory is a required arg");
+    Preconditions.checkPositionIndex(2, args.length,
+        "classname is a required arg");
+    String input = args[0];
+    String output = args[1];
+    Class clazz = Class.forName(args[2].trim());
+    WAL inputWAL = new WAL(new File(input), clazz);
+    if (args.length == 4) {
+      inputWAL.numReplaySequenceIDOverride = Integer.parseInt(args[3]);
+      System.out.println("Overridng numReplaySequenceIDOverride: "
+          + inputWAL.numReplaySequenceIDOverride);
+    }
+    WALReplayResult<?> result = inputWAL.replay();
+    inputWAL.close();
+    System.out.println("     SeqID: " + result.getSequenceID());
+    System.out.println("NumEntries: " + result.getResults().size());
+    WAL outputWAL = new WAL(new File(output), clazz);
+    outputWAL.writeEntries(result.getResults());
+    outputWAL.close();
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+class WALDataFile<T extends Writable> {
+
+  private static final int VERSION = 1;
+  
+  private static final int RECORD_TYPE_EVENT = 1;
+  private static final int RECORD_TYPE_COMMIT = 2;
+  
+  static class Reader<T extends Writable> implements Closeable {
+    Class<T> clazz;
+    DataInputStream input;
+    private Configuration conf = new Configuration();
+    Reader(File path, Class<T> clazz) throws IOException {
+      this.clazz = clazz;
+      input = new DataInputStream(new FileInputStream(path));
+      int version = input.readInt();
+      if(version != VERSION) {
+        throw new IOException("Expected " + VERSION + " and got " + version);
+      }
+    }
+
+    List<WALEntry<T>> nextBatch() throws IOException {
+      List<WALEntry<T>> batch = Lists.newArrayList();
+      // read until we hit a commit marker or until the
+      // commit marker is encountered
+      while(true) {
+        try {
+          int type = input.readInt();
+          if(type == RECORD_TYPE_EVENT) {
+            WALEntry<T> entry = newWALEntry(clazz, conf);
+            entry.readFields(input);
+            batch.add(entry);
+          } else if(type == RECORD_TYPE_COMMIT) {
+            // we only return what we have read if we find a command entry
+            return batch;
+          } else {
+            throw new IOException("Unknown record type " + Integer.toHexString(type));
+          }
+        } catch(EOFException e) {
+          // in the EOF case, we crashed or shutdown while writing a batch
+          // and were unable to complete that batch. As such the client
+          // would have gotten an exception and retried or locally
+          // stored the batch for resending later
+          return null;
+        }        
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (input != null) {
+        input.close();
+      }
+    }
+  }
+
+  /**
+   * Append and flush operations are synchronized as we are modifying
+   * a file in said methods.
+   */
+  static class Writer<T extends Writable> implements Closeable {
+    private FileOutputStream fileOutput;
+    private DataOutputStream dataOutput;
+    private AtomicLong largestSequenceID = new AtomicLong(0);
+    private File path;
+
+    Writer(File path) throws IOException {
+      this.path = path;
+      fileOutput = new FileOutputStream(path);
+      dataOutput = new DataOutputStream(fileOutput);
+      dataOutput.writeInt(VERSION);
+      flush();
+    }
+
+    // TODO group commit
+    synchronized void append(List<WALEntry<T>> entries) throws IOException {
+      for (WALEntry<T> entry : entries) {
+        largestSequenceID.set(Math.max(entry.getSequenceID(), largestSequenceID.get()));
+        dataOutput.writeInt(RECORD_TYPE_EVENT);
+        entry.write(dataOutput);
+      }
+      // if this is successful, the events have been
+      // successfully persisted and will be replayed
+      // in the case of a crash
+      dataOutput.writeInt(RECORD_TYPE_COMMIT); 
+      flush(false);
+    }
+
+    synchronized void flush() throws IOException {
+      flush(true);
+    }
+    synchronized void flush(boolean metadata) throws IOException {
+      fileOutput.getChannel().force(metadata);
+    }
+
+    public long getLargestSequenceID() {
+      return largestSequenceID.get();
+    }
+    public File getPath() {
+      return path;
+    }
+
+    public long getSize() {
+      return dataOutput.size();
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      if (dataOutput != null) {
+        flush();
+        dataOutput.close();
+      }
+    }
+  }
+  private static <T extends Writable> WALEntry<T> newWALEntry(Class<T> clazz, Configuration conf) {
+    return new WALEntry<T>(ReflectionUtils.newInstance(clazz, conf));
+  }
+}
\ No newline at end of file

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wraps a {@link Writable} with a sequence id so that both can
+ * be written together to a file.
+ */
+public class WALEntry<T extends Writable> implements Writable {
+  /**
+   * Provides a minimum guarantee we are not reading complete junk
+   */
+  private static final int MAGIC_HEADER = 0xdeadbeef;
+
+  private T data;
+  private long sequenceID;
+  /**
+   * Only to be used when reading a wal file from disk
+   */
+  WALEntry(T data) {
+    this(data, -1);
+  }
+  /**
+   * Creates a WALEntry with specified payload and sequence id
+   * @param data
+   * @param sequenceID
+   */
+  public WALEntry(T data, long sequenceID) {
+    this.data = data;
+    this.sequenceID = sequenceID;
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int header = in.readInt();
+    if(header != MAGIC_HEADER) {
+      throw new IOException("Header is " + Integer.toHexString(header) +
+          " expected " + Integer.toHexString(MAGIC_HEADER));
+    }
+    sequenceID = in.readLong();
+    data.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(MAGIC_HEADER);
+    out.writeLong(sequenceID);
+    data.write(out);
+  }
+
+  public T getData() {
+    return data;
+  }
+
+  public long getSequenceID() {
+    return sequenceID;
+  }
+
+  static int getBaseSize() {
+    int base = 4 /* magic header of type int */ + 8 /* seqid of type long */;
+    return base;
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class WALReplayResult<T extends Writable> {
+
+  private List<WALEntry<T>> results;
+  private long sequenceID;
+
+  public WALReplayResult(List<WALEntry<T>> results, long sequenceID) {
+    this.results = results;
+    this.sequenceID = sequenceID;
+  }
+  public List<WALEntry<T>> getResults() {
+    return results;
+  }
+  public long getSequenceID() {
+    return sequenceID;
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALReplayResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.recoverable.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.NullSink;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestRecoverableMemoryChannel {
+
+  @SuppressWarnings("unused")
+  private static final Logger logger = LoggerFactory
+  .getLogger(TestRecoverableMemoryChannel.class);
+
+  private RecoverableMemoryChannel channel;
+
+  private File dataDir;
+
+  @Before
+  public void setUp() {
+    dataDir = Files.createTempDir();
+    Assert.assertTrue(dataDir.isDirectory());
+    channel = createFileChannel();
+
+  }
+
+  private RecoverableMemoryChannel createFileChannel() {
+    RecoverableMemoryChannel channel = new RecoverableMemoryChannel();
+    Context context = new Context();
+    context.put(RecoverableMemoryChannel.WAL_DATA_DIR, dataDir.getAbsolutePath());
+    Configurables.configure(channel, context);
+    channel.start();
+    return channel;
+  }
+
+  @After
+  public void teardown() {
+    FileUtils.deleteQuietly(dataDir);
+  }
+
+  @Test
+  public void testRollbackWithSink() throws Exception {
+    final NullSink sink = new NullSink();
+    sink.setChannel(channel);
+    final int numItems = 99;
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        int count = 0;
+        while(count++ < numItems) {
+          try {
+            sink.process();
+            Thread.sleep(1);
+          } catch(EventDeliveryException e) {
+            break;
+          } catch (Exception e) {
+            Throwables.propagate(e);
+          }
+        }
+      }
+    };
+    t.setDaemon(true);
+    t.setName("NullSink");
+    t.start();
+
+    putEvents(channel, "rollback", 10, 100);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
+    transaction.rollback();
+    transaction.close();
+
+    while(t.isAlive()) {
+      Thread.sleep(1);
+    }
+
+
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+
+    // get the item which was not rolled back
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Event event = channel.take();
+    transaction.commit();
+    transaction.close();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("rollback-90-9", new String(event.getBody(), Charsets.UTF_8));
+  }
+
+
+  @Test
+  public void testRollback() throws Exception {
+    // put an item and commit
+    putEvents(channel, "rollback", 1, 50);
+
+    Transaction transaction;
+    // put an item we will rollback
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("this is going to be rolledback".getBytes(Charsets.UTF_8)));
+    transaction.rollback();
+    transaction.close();
+
+    // simulate crash
+    channel.stop();
+    channel = createFileChannel();
+
+    // get the item which was not rolled back
+    transaction = channel.getTransaction();
+    transaction.begin();
+    Event event = channel.take();
+    transaction.commit();
+    transaction.close();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("rollback-0-0", new String(event.getBody(), Charsets.UTF_8));
+  }
+  @Test
+  public void testPut() throws Exception {
+    // should find no items
+    int found = takeEvents(channel, "unbatched-gets", 1, 5).size();
+    Assert.assertEquals(0, found);
+    putEvents(channel, "unbatched", 1, 5);
+    putEvents(channel, "batched", 5, 5);
+  }
+
+  @Test
+  public void testThreaded() throws IOException, InterruptedException {
+    int numThreads = 10;
+    final CountDownLatch startLatch = new CountDownLatch(numThreads * 2);
+    final CountDownLatch stopLatch = new CountDownLatch(numThreads * 2);
+    final List<Exception> errors = Collections
+        .synchronizedList(new ArrayList<Exception>());
+    final List<String> expected = Collections
+        .synchronizedList(new ArrayList<String>());
+    final List<String> actual = Collections
+        .synchronizedList(new ArrayList<String>());
+    for (int i = 0; i < numThreads; i++) {
+      final int id = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            if (id % 2 == 0) {
+              expected.addAll(putEvents(channel, Integer.toString(id), 1, 5));
+            } else {
+              expected.addAll(putEvents(channel, Integer.toString(id), 5, 5));
+            }
+          } catch (Exception e) {
+            errors.add(e);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+    }
+    for (int i = 0; i < numThreads; i++) {
+      final int id = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            String prefix = "take-thread-" + Integer.toString(id);
+            if (id % 2 == 0) {
+              actual.addAll(takeEvents(channel, prefix, 1, Integer.MAX_VALUE));
+            } else {
+              actual.addAll(takeEvents(channel, prefix, 5, Integer.MAX_VALUE));
+            }
+          } catch (Exception e) {
+            errors.add(e);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+    }
+    Assert.assertTrue(stopLatch.await(5, TimeUnit.SECONDS));
+    Assert.assertEquals(Collections.EMPTY_LIST, errors);
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+  }
+  private static List<String> takeEvents(Channel channel, String prefix, int batchSize,
+      int numEvents) throws Exception {
+    List<String> result = Lists.newArrayList();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      for (int j = 0; j < batchSize; j++) {
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        try {
+          Event event = channel.take();
+          if(event == null) {
+            transaction.commit();
+            return result;
+          }
+          result.add(new String(event.getBody(), Charsets.UTF_8));
+          transaction.commit();
+        } catch (Exception ex) {
+          transaction.rollback();
+          throw ex;
+        } finally {
+          transaction.close();
+        }
+      }
+    }
+    return result;
+  }
+  private static List<String> putEvents(Channel channel, String prefix, int batchSize,
+      int numEvents) throws Exception {
+    List<String> result = Lists.newArrayList();
+    for (int i = 0; i < numEvents; i += batchSize) {
+      for (int j = 0; j < batchSize; j++) {
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        try {
+          String s = prefix + "-" + i +"-" + j;
+          Event event = EventBuilder.withBody(s.getBytes(Charsets.UTF_8));
+          result.add(s);
+          channel.put(event);
+          transaction.commit();
+        } catch (Exception ex) {
+          transaction.rollback();
+          throw ex;
+        } finally {
+          transaction.close();
+        }
+      }
+    }
+    return result;
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.util.Random;
+
+import org.apache.flume.channel.recoverable.memory.wal.SequenceIDBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSequenceIDBuffer {
+
+  @Test
+  public void testBinarySearch() {
+    int size = 100;
+    SequenceIDBuffer buffer = new SequenceIDBuffer(size);
+    Assert.assertEquals(size, buffer.size());
+    for (int i = 0; i < 100; i++) {
+      buffer.put(i, i);
+    }
+    buffer.sort();
+    Assert.assertFalse(buffer.exists(-1));
+    Assert.assertFalse(buffer.exists(101));
+    for (int i = 0; i < 100; i++) {
+      Assert.assertTrue(buffer.exists(i));
+    }
+  }
+
+  @Test
+  public void testSortAndCompareTo() {
+    int size = 100;
+    SequenceIDBuffer buffer = new SequenceIDBuffer(size);
+    Assert.assertEquals(size, buffer.size());
+    Random random = new Random();
+    for (int i = 0; i < 100; i++) {
+      buffer.put(i, Math.abs(random.nextLong()));
+    }
+
+    buffer.sort();
+
+    long last = Long.MIN_VALUE;
+    for (int i = 0; i < 100; i++) {
+      long current = buffer.get(i);
+      Assert.assertTrue(last <= current);
+    }
+  }
+
+  @Test
+  public void testSwap() {
+    SequenceIDBuffer buffer = new SequenceIDBuffer(2);
+    buffer.put(0, Long.MAX_VALUE);
+    buffer.put(1, Long.MIN_VALUE);
+    buffer.swap(0, 1);
+    Assert.assertEquals(buffer.get(0), Long.MIN_VALUE);
+    Assert.assertEquals(buffer.get(1), Long.MAX_VALUE);
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestSequenceIDBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java Sun Apr  1 18:17:31 2012
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.recoverable.memory.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.recoverable.memory.wal.WAL;
+import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
+import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestWAL {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestWAL.class);
+
+  private File dataDir;
+  private WAL<Text> wal;
+
+  @Before
+  public void setup() throws IOException {
+    dataDir = Files.createTempDir();
+    Assert.assertTrue(dataDir.isDirectory());
+    wal = new WAL<Text>(dataDir, Text.class);
+  }
+  @After
+  public void teardown() throws IOException {
+    wal.close();
+    FileUtils.deleteQuietly(dataDir);
+  }
+
+  /**
+   * Create a whole bunch of files and ensure they are cleaned up
+   */
+  @Test
+  public void testRoll() throws IOException, InterruptedException {
+    wal.close();
+    wal = new WAL<Text>(dataDir, Text.class, 0L, 0L, 0L, 1L);
+    long seqid = 0;
+    List<String> expected = strings(100);
+    for(String s : expected) {
+      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
+      Thread.sleep(1);
+      wal.writeSequenceID(seqid);
+      Thread.sleep(1);
+    }
+    wal.writeSequenceID(Long.MAX_VALUE);
+    Thread.sleep(1000L);
+    wal.close();
+    File seq = new File(dataDir, "seq");
+    File[] seqFiles = seq.listFiles();
+    Assert.assertNotNull(seqFiles);
+    Assert.assertTrue(seqFiles.length < 5);
+    File data = new File(dataDir, "data");
+    File[] dataFiles = data.listFiles();
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length < 5);
+  }
+
+  @Test
+  public void testBasicReplay() throws IOException {
+    long seqid = 0;
+    List<String> expected = strings(100);
+    for(String s : expected) {
+      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
+    }
+    wal.close();
+    wal = new WAL<Text>(dataDir, Text.class);
+    WALReplayResult<Text> result = wal.replay();
+    Assert.assertEquals(100, result.getSequenceID());
+    List<String> actual = toStringList(result.getResults());
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testReplayAtOffset() throws IOException {
+    long seqid = 0;
+    List<String> expected = strings(100);
+    for(String s : expected) {
+      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
+    }
+    wal.writeSequenceID(50);
+    expected.remove(50);
+    wal.close();
+    wal = new WAL<Text>(dataDir, Text.class);
+    WALReplayResult<Text> result = wal.replay();
+    Assert.assertEquals(100, result.getSequenceID());
+    List<String> actual = toStringList(result.getResults());
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(99, actual.size());
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testReplayNone() throws IOException {
+    long seqid = 0;
+    List<String> expected = strings(100);
+    for(String s : expected) {
+      wal.writeEntry(new WALEntry<Text>(new Text(s), ++seqid));
+      wal.writeSequenceID(seqid);
+    }
+    wal.close();
+    wal = new WAL<Text>(dataDir, Text.class);
+    WALReplayResult<Text> result = wal.replay();
+    Assert.assertEquals(expected.size(), result.getSequenceID());
+    List<String> actual = toStringList(result.getResults());
+    Assert.assertEquals(Collections.EMPTY_LIST, actual);
+  }
+
+  @Test
+  public void testThreadedAppend() throws IOException, InterruptedException {
+    int numThreads = 20;
+    final CountDownLatch startLatch = new CountDownLatch(numThreads);
+    final CountDownLatch stopLatch = new CountDownLatch(numThreads);
+    final AtomicLong seqid = new AtomicLong(0);
+    final List<String> globalExpected = Collections.synchronizedList(new ArrayList<String>());
+    final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>());
+    for (int i = 0; i < numThreads; i++) {
+      final int id = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            List<String> expected = strings(100);
+            globalExpected.addAll(expected);
+            startLatch.countDown();
+            startLatch.await();
+            // half batch, half do not
+            if(id % 2 == 0) {
+              for(String s : expected) {
+                wal.writeEntry(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
+              }
+            } else {
+              List<WALEntry<Text>> batch = Lists.newArrayList();
+              for(String s : expected) {
+                batch.add(new WALEntry<Text>(new Text(s), seqid.incrementAndGet()));
+              }
+              wal.writeEntries(batch);
+            }
+          } catch (Exception e) {
+            logger.warn("Error doing appends", e);
+            errors.add(e);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      };
+      t.setDaemon(true);
+      t.start();
+    }
+    Assert.assertTrue(stopLatch.await(5, TimeUnit.SECONDS));
+    Assert.assertEquals(Collections.EMPTY_LIST, errors);
+    wal.close();
+    wal = new WAL<Text>(dataDir, Text.class);
+    WALReplayResult<Text> result = wal.replay();
+    Assert.assertEquals(2000, result.getSequenceID());
+    List<String> actual = toStringList(result.getResults());
+    // we don't know what order the items threads will be able to
+    // append to the wal, so sort to the lists to make then sensible
+    Collections.sort(actual);
+    Collections.sort(globalExpected);
+    Assert.assertEquals(globalExpected, actual);
+  }
+
+  @Test(expected=IOException.class)
+  public void testInvalidReadClass() throws IOException {
+    wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
+    wal.close();
+    new WAL<IntWritable>(dataDir, IntWritable.class);
+  }
+
+  @Test(expected=NullPointerException.class)
+  public void testCloseSingle() throws IOException {
+    wal.close();
+    wal.writeEntry(new WALEntry<Text>(new Text(""), 1));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(expected=NullPointerException.class)
+  public void testCloseList() throws IOException {
+    wal.close();
+    wal.writeEntries(Lists.newArrayList(new WALEntry<Text>(new Text(""), 1)));
+  }
+
+  @Test(expected=NullPointerException.class)
+  public void testCloseSequenceID() throws IOException {
+    wal.close();
+    wal.writeSequenceID(1L);
+  }
+
+  private static List<String> strings(int num) {
+    List<String> result = Lists.newArrayList();
+    for (int i = 0; i < num; i++) {
+      String s = Integer.toString(num);
+      result.add(s);
+    }
+    return result;
+  }
+  private static List<String> toStringList(List<WALEntry<Text>> list) {
+    List<String> result = Lists.newArrayList();
+    for(WALEntry<Text> entry : list) {
+      result.add(entry.getData().toString());
+    }
+    return result;
+  }
+
+  public static void main(String[] args) throws IOException {
+    Preconditions.checkPositionIndex(0, args.length,
+        "size  of event is a required arg");
+    Preconditions.checkPositionIndex(1, args.length,
+        "batch size is a required arg");
+
+    int size = Integer.parseInt(args[0]);
+    int batchSize = Integer.parseInt(args[1]);
+
+    byte[] buffer = new byte[size];
+    for (int i = 0; i < buffer.length; i++) {
+      buffer[i] = (byte)'A';
+    }
+    BytesWritable bytes = new BytesWritable(buffer);
+    List<WALEntry<BytesWritable>> batch = Lists.newArrayList();
+    long seqid = 0;
+    long numBytes = 0;
+    long count = 0;
+    long start = System.currentTimeMillis();
+    File dataDir = Files.createTempDir();
+    try {
+      WAL<BytesWritable>  wal = new WAL<BytesWritable>(dataDir, BytesWritable.class);
+      while(true) {
+        batch.clear();
+        for (int i = 0; i < batchSize; i++) {
+          batch.add(new  WALEntry<BytesWritable>(bytes, seqid++));
+        }
+        wal.writeEntries(batch);
+        count += batchSize;
+        numBytes += buffer.length * batchSize;
+
+        long expired = System.currentTimeMillis() - start;
+        if(expired > 10000L) {
+          start = System.currentTimeMillis();
+          System.out.println(String.format("Events/s %d, MB/s %4.2f", (count/10),
+              (double)(numBytes/1024L/1024L)/(double)(expired/1000L)));
+          numBytes = count = 0;
+        }
+      }
+    } finally {
+      FileUtils.deleteQuietly(dataDir);
+    }
+  }
+}

Propchange: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/wal/TestWAL.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties?rev=1308168&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties (added)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/resources/log4j.properties Sun Apr  1 18:17:31 2012
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = DEBUG



Mime
View raw message