flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1166417 - in /incubator/flume/branches/flume-728: ./ flume-ng-channels/ flume-ng-channels/flume-jdbc-channel/ flume-ng-channels/flume-jdbc-channel/src/ flume-ng-channels/flume-jdbc-channel/src/main/ flume-ng-channels/flume-jdbc-channel/src...
Date Wed, 07 Sep 2011 21:43:25 GMT
Author: arvind
Date: Wed Sep  7 21:43:24 2011
New Revision: 1166417

URL: http://svn.apache.org/viewvc?rev=1166417&view=rev
Log:
FLUME-760. First patch.

This patch tweaks the Channel and Transaction interface to establish
the transaction use idiom. The implementation of NullSink, LoggerSink,
RollingFileSink as well as that of MemoryChannel has been updated 
according to these changes.

Also introduced in this commit is a new module for JDBC channel 
implementation which is currently a stub that will get populated with
future commits.

Added:
    incubator/flume/branches/flume-728/flume-ng-channels/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
    incubator/flume/branches/flume-728/flume-ng-channels/pom.xml
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
Modified:
    incubator/flume/branches/flume-728/.gitignore
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Transaction.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/pom.xml

Modified: incubator/flume/branches/flume-728/.gitignore
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/.gitignore?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/.gitignore (original)
+++ incubator/flume/branches/flume-728/.gitignore Wed Sep  7 21:43:24 2011
@@ -11,4 +11,4 @@ bin/.settings
 .eclipse
 pmd_report.html
 */bin
-*/target
+target

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml?rev=1166417&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/pom.xml Wed Sep
 7 21:43:24 2011
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-channels</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-channels</groupId>
+  <artifactId>flume-jdbc-channel</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG JDBC channel</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>0.9.5-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

Added: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java?rev=1166417&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
(added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
Wed Sep  7 21:43:24 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel.jdbc;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+/**
+ * <p>A JDBC based channel implementation.</p>
+ */
+public class JdbcChannel implements Channel {
+
+  @Override
+  public void put(Event event) throws ChannelException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public Event take() throws ChannelException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Transaction getTransaction() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-channels/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/pom.xml?rev=1166417&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-channels/pom.xml Wed Sep  7 21:43:24 2011
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-ng-channels</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG Channels</name>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>flume-jdbc-channel</module>
+  </modules>
+</project>

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Channel.java
Wed Sep  7 21:43:24 2011
@@ -1,14 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume;
 
+/**
+ * <p>
+ * A channel connects a <tt>Source</tt> to a <tt>Sink</tt>. The source
+ * acts as producer while the sink acts as a consumer of events. The channel
+ * itself is the buffer between the two.
+ * </p>
+ * <p>
+ * A channel exposes a <tt>Transaction</tt> interface that can be used by
+ * its clients to ensure atomic <tt>put</tt> or <tt>remove</tt>
+ * semantics. This is necessary to guarantee single hop reliability in a
+ * logical node. For instance, a source will produce an event successfully
+ * if and only if it can be committed to the channel. Similarly, a sink will
+ * consume an event if and only if its end point can accept the event. The
+ * extent of transaction support varies for different channel implementations
+ * ranging from strong to best-effort semantics.
+ * </p>
+ *
+ * @see org.apache.flume.EventSource
+ * @see org.apache.flume.EventSink
+ * @see org.apache.flume.Transaction
+ */
 public interface Channel {
 
-  public void put(Event event) throws InterruptedException,
-      EventDeliveryException;
-
-  public Event take() throws InterruptedException;
-
-  public void release(Event event);
-
+  /**
+   * <p>Puts the given event in the channel.</p>
+   * <p><strong>Note</strong>: This method must be invoked within an active
+   * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+   * results.</p>
+   * @param event the event to transport.
+   * @throws ChannelException in case this operation fails.
+   * @see org.apache.flume.Transaction#begin()
+   */
+  public void put(Event event) throws ChannelException;
+
+  /**
+   * <p>Returns the next event from the channel if available. If the channel
+   * does not have any events available, this method would return <tt>null</tt>.
+   * </p>
+   * <p><strong>Note</strong>: This method must be invoked within an active
+   * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+   * results.</p>
+   * @return the next available event or <tt>null</tt> if no events are
+   * available.
+   * @throws ChannelException in case this operation fails.
+   * @see org.apache.flume.Transaction#begin()
+   */
+  public Event take() throws ChannelException;
+
+  /**
+   * @return the transaction instance associated with this channel.
+   */
   public Transaction getTransaction();
-
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java?rev=1166417&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
(added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
Wed Sep  7 21:43:24 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume;
+
+/**
+ * <p>A channel exception is raised whenever a channel operation fails.</p>
+ */
+public class ChannelException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * @param message the exception message
+   */
+  public ChannelException(String message) {
+    super(message);
+  }
+
+  /**
+   * @param ex the causal exception
+   */
+  public ChannelException(Exception ex) {
+    super(ex);
+  }
+
+  /**
+   * @param message the exception message
+   * @param ex the causal exception
+   */
+  public ChannelException(String message, Exception ex) {
+    super(message, ex);
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Transaction.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Transaction.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Transaction.java
Wed Sep  7 21:43:24 2011
@@ -1,11 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume;
 
+/**
+ * <p>Provides the transaction boundary while accessing a channel.</p>
+ * <p>A <tt>Transaction</tt> instance is used to encompass channel access
+ * via the following idiom:</p>
+ * <pre><code>
+ * Channel ch = ...
+ * Transaction tx = ch.getTransaction();
+ * try {
+ *   tx.begin();
+ *   ...
+ *   // ch.put(event) or ch.take()
+ *   ...
+ *   tx.commit();
+ * } catch (ChannelException ex) {
+ *   tx.rollback();
+ *   ...
+ * } finally {
+ *   tx.close();
+ * }
+ * </code></pre>
+ * <p>Depending upon the implementation of the channel, the transaction
+ * semantics may be strong, or best-effort only.</p>
+ *
+ * @see org.apache.flume.Channel
+ */
 public interface Transaction {
 
+  /**
+   * <p>Starts a transaction boundary for the current channel operation. If a
+   * transaction is already in progress, this method will join that transaction
+   * using reference counting.</p>
+   * <p><strong>Note</strong>: For every invocation of this method there
must
+   * be a corresponding invocation of {@linkplain #close()} method. Failure
+   * to ensure this can lead to dangling transactions and unpredictable results.
+   * </p>
+   */
   public void begin();
 
+  /**
+   * Indicates that the transaction can be successfully committed. It is
+   * required that a transaction be in progress when this method is invoked.
+   */
   public void commit();
 
+  /**
+   * Indicates that the transaction can must be aborted. It is
+   * required that a transaction be in progress when this method is invoked.
+   */
   public void rollback();
 
+  /**
+   * <p>Ends a transaction boundary for the current channel operation. If a
+   * transaction is already in progress, this method will join that transaction
+   * using reference counting. The transaction is completed only if there
+   * are no more references left for this transaction.</p>
+   * <p><strong>Note</strong>: For every invocation of this method there
must
+   * be a corresponding invocation of {@linkplain #begin()} method. Failure
+   * to ensure this can lead to dangling transactions and unpredictable results.
+   * </p>
+   */
+  public void close();
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Wed Sep  7 21:43:24 2011
@@ -1,11 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.channel;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 
 public class MemoryChannel implements Channel {
@@ -17,20 +34,21 @@ public class MemoryChannel implements Ch
   }
 
   @Override
-  public void put(Event event) throws InterruptedException,
-      EventDeliveryException {
-
-    queue.put(event);
-  }
-
-  @Override
-  public Event take() throws InterruptedException {
-    return queue.take();
+  public void put(Event event) {
+    try {
+      queue.put(event);
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to put(" + event + ")", ex);
+    }
   }
 
   @Override
-  public void release(Event event) {
-    /* Release is a no-op on an in memory channel. */
+  public Event take() {
+    try {
+      return queue.take();
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to take()", ex);
+    }
   }
 
   @Override
@@ -67,6 +85,8 @@ public class MemoryChannel implements Ch
     public void rollback() {
     }
 
+    @Override
+    public void close() {
+    }
   }
-
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
Wed Sep  7 21:43:24 2011
@@ -1,7 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.sink;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSink;
 import org.apache.flume.Transaction;
 import org.slf4j.Logger;
@@ -13,18 +31,21 @@ public class LoggerSink extends Abstract
       .getLogger(LoggerSink.class);
 
   @Override
-  public void process() {
+  public void process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
+    Event event = null;
 
     try {
       transaction.begin();
-      Event event = channel.take();
-      logger.info(event.toString());
-      channel.release(event);
+      event = channel.take();
+      logger.info("Event: " + event);
       transaction.commit();
-    } catch (Exception e) {
+    } catch (Exception ex) {
       transaction.rollback();
+      throw new EventDeliveryException("Failed to log event: " + event, ex);
+    } finally {
+      transaction.close();
     }
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
Wed Sep  7 21:43:24 2011
@@ -1,8 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.sink;
 
 import org.apache.flume.Channel;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSink;
 import org.apache.flume.Transaction;
 import org.slf4j.Logger;
@@ -19,25 +37,25 @@ public class NullSink extends AbstractSi
   }
 
   @Override
-  public void process() {
+  public void process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
+    Event event = null;
 
     try {
       transaction.begin();
-
-      Event event = channel.take();
-
-      channel.release(event);
+      event = channel.take();
+      logger.debug("Consumed the event: " + event);
       transaction.commit();
-
-      counterGroup.incrementAndGet("events.successful");
-    } catch (Exception e) {
-      counterGroup.incrementAndGet("events.failed");
-      logger.error("Failed to deliver event. Exception follows.", e);
+    } catch (Exception ex) {
       transaction.rollback();
+      counterGroup.incrementAndGet("events.failed");
+      logger.error("Failed to deliver event. Exception follows.", ex);
+      throw new EventDeliveryException("Failed to deliver event: " + event, ex);
+    } finally {
+      transaction.close();
     }
-
+    counterGroup.incrementAndGet("events.successful");
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
Wed Sep  7 21:43:24 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flume.sink;
 
 import java.io.BufferedOutputStream;
@@ -143,24 +160,26 @@ public class RollingFileSink extends Abs
 
       byte[] bytes = formatter.format(event);
 
+      outputStream.write(bytes);
+
       /*
        * FIXME: Feature: Rotate on size and time by checking bytes written and
        * setting shouldRotate = true if we're past a threshold.
        */
       counterGroup.addAndGet("sink.bytesWritten", (long) bytes.length);
 
-      outputStream.write(bytes);
-
       /*
        * FIXME: Feature: Control flush interval based on time or number of
        * events. For now, we're super-conservative and flush on each write.
        */
       outputStream.flush();
 
-      channel.release(event);
       transaction.commit();
-    } catch (Exception e) {
+    } catch (Exception ex) {
       transaction.rollback();
+      throw new EventDeliveryException("Failed to process event: " + event, ex);
+    } finally {
+      transaction.close();
     }
   }
 

Modified: incubator/flume/branches/flume-728/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/pom.xml?rev=1166417&r1=1166416&r2=1166417&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/pom.xml (original)
+++ incubator/flume/branches/flume-728/pom.xml Wed Sep  7 21:43:24 2011
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
@@ -92,6 +109,7 @@
         <module>flume-ng-core</module>
         <module>flume-ng-node</module>
         <module>flume-ng-dist</module>
+        <module>flume-ng-channels</module>
       </modules>
     </profile>
 



Mime
View raw message