flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1180255 - in /incubator/flume/branches/flume-728/flume-ng-core/src/main: avro/flume.avdl java/org/apache/flume/client/avro/AvroCLIClient.java java/org/apache/flume/source/AvroSource.java
Date Fri, 07 Oct 2011 21:39:35 GMT
Author: esammer
Date: Fri Oct  7 21:39:35 2011
New Revision: 1180255

URL: http://svn.apache.org/viewvc?rev=1180255&view=rev
Log:
FLUME-783: Add a batch event RPC call to AvroSource

- Also fixed a bug where I broke the command line opt parsing ofr
  the Avro client tool. Oops.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl?rev=1180255&r1=1180254&r2=1180255&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/avro/flume.avdl Fri Oct  7 21:39:35
2011
@@ -31,4 +31,6 @@ protocol AvroSourceProtocol {
 
   Status append( AvroFlumeEvent event );
 
+  Status appendBatch( array<AvroFlumeEvent> events );
+
 }
\ No newline at end of file

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java?rev=1180255&r1=1180254&r2=1180255&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
Fri Oct  7 21:39:35 2011
@@ -7,7 +7,9 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
@@ -67,7 +69,7 @@ public class AvroCLIClient {
 
     port = Integer.parseInt(commandLine.getOptionValue("port"));
 
-    if (!commandLine.hasOption("")) {
+    if (!commandLine.hasOption("host")) {
       throw new ParseException(
           "You must specify a hostname to connet to with --host");
     }
@@ -83,6 +85,7 @@ public class AvroCLIClient {
     AvroSourceProtocol client = SpecificRequestor.getClient(
         AvroSourceProtocol.class, transceiver);
     BufferedReader reader = null;
+    List<AvroFlumeEvent> eventBuffer = new ArrayList<AvroFlumeEvent>();
 
     if (fileName != null) {
       reader = new BufferedReader(new FileReader(new File(fileName)));
@@ -97,27 +100,44 @@ public class AvroCLIClient {
     while ((line = reader.readLine()) != null) {
       // logger.debug("read:{}", line);
 
+      if (eventBuffer.size() >= 1000) {
+        Status status = client.appendBatch(eventBuffer);
+
+        if (!status.equals(Status.OK)) {
+          logger.error("Unable to send batch size:{} status:{}",
+              eventBuffer.size(), status);
+        }
+
+        eventBuffer.clear();
+      }
+
       AvroFlumeEvent avroEvent = new AvroFlumeEvent();
 
       avroEvent.headers = new HashMap<CharSequence, CharSequence>();
       avroEvent.body = ByteBuffer.wrap(line.getBytes());
-      sentBytes += avroEvent.body.capacity();
 
-      Status status = client.append(avroEvent);
-      sent++;
+      eventBuffer.add(avroEvent);
 
-      if (!status.equals(Status.OK)) {
-        logger.error("Unable to send event:{} status:{}", avroEvent, status);
-      }
+      sentBytes += avroEvent.body.capacity();
+      sent++;
 
       long now = System.currentTimeMillis();
 
       if (now >= lastCheck + 5000) {
-        logger.debug("Sent {} bytes, {} events", sentBytes, sent);
+        logger.debug("Packed {} bytes, {} events", sentBytes, sent);
         lastCheck = now;
       }
+    }
+
+    if (eventBuffer.size() > 0) {
+      Status status = client.appendBatch(eventBuffer);
+
+      if (!status.equals(Status.OK)) {
+        logger.error("Unable to send batch size:{} status:{}",
+            eventBuffer.size(), status);
+      }
 
-      // logger.debug("Sent:{} Status:{}", ++sent, status);
+      eventBuffer.clear();
     }
 
     logger.debug("Finished");

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1180255&r1=1180254&r2=1180255&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
Fri Oct  7 21:39:35 2011
@@ -2,6 +2,7 @@ package org.apache.flume.source;
 
 import java.net.InetSocketAddress;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -121,4 +122,41 @@ public class AvroSource extends Abstract
 
     return Status.OK;
   }
+
+  @Override
+  public Status appendBatch(List<AvroFlumeEvent> events) {
+    counterGroup.incrementAndGet("rpc.received.batch");
+
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+
+    try {
+      transaction.begin();
+
+      for (AvroFlumeEvent avroEvent : events) {
+        Map<String, String> headers = new HashMap<String, String>();
+
+        for (Entry<CharSequence, CharSequence> entry : avroEvent.headers
+            .entrySet()) {
+
+          headers.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+
+        Event event = EventBuilder.withBody(avroEvent.body.array(), headers);
+        channel.put(event);
+        counterGroup.incrementAndGet("rpc.events");
+      }
+
+      transaction.commit();
+    } catch (ChannelException e) {
+      transaction.rollback();
+      return Status.FAILED;
+    } finally {
+      transaction.close();
+    }
+
+    counterGroup.incrementAndGet("rpc.successful");
+
+    return Status.OK;
+  }
 }



Mime
View raw message