flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [2/2] git commit: FLUME-1382: Flume adopt message from existing local Scribe
Date Fri, 24 Aug 2012 17:03:05 GMT
FLUME-1382: Flume adopt message from existing local Scribe

Implementation of ScribeSource allowing flume to receive messages from scribe, or any other agent using the thrift/scribe protocol
(Danny Ye via Juhani Connolly)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3cac43e5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3cac43e5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3cac43e5

Branch: refs/heads/flume-1.3.0
Commit: 3cac43e59f693fd1cc46c3a9df3ea00ec9858232
Parents: e014030
Author: Juhani Connolly <juhanic@cyberagent.co.jp>
Authored: Wed Aug 15 15:44:14 2012 +0900
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Aug 15 18:34:34 2012 -0700

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                              |    4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   27 +
 flume-ng-sources/flume-scribe-source/pom.xml       |  148 +++
 .../org/apache/flume/source/scribe/LogEntry.java   |  414 +++++++
 .../org/apache/flume/source/scribe/ResultCode.java |   53 +
 .../org/apache/flume/source/scribe/Scribe.java     |  879 +++++++++++++++
 .../apache/flume/source/scribe/ScribeSource.java   |  179 +++
 flume-ng-sources/pom.xml                           |   47 +
 pom.xml                                            |    7 +
 9 files changed, 1758 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 0e52bc5..3f5dc8c 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -105,6 +105,10 @@
       <artifactId>flume-ng-hbase-sink</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sources</groupId>
+      <artifactId>flume-scribe-source</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
       <artifactId>flume-avro-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index be594d6..be6d597 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -894,6 +894,33 @@ Example for agent named **agent_foo**:
   agent_foo.channels = memoryChannel-1
   agent_foo.sources.legacysource-1.type = your.namespace.YourClass
   agent_foo.sources.legacysource-1.channels = memoryChannel-1
+  
+Scribe Source
+~~~~~~~~~~~~~
+
+Scribe is another type of ingest system. To adopt existing Scribe ingest system, 
+Flume should use ScribeSource based on Thrift with compatible transfering protocol.
+The deployment of Scribe please following guide from Facebook.
+Required properties are in **bold**.
+
+==============  ===========  ==============================================
+Property Name   Default      Description
+==============  ===========  ==============================================
+**type**        --           The component type name, needs to be ``org.apache.flume.source.scribe.ScribeSource``
+port            1499         Port that Scribe should be connected
+workerThreads   5			 Handing threads number in Thrift
+==============  ===========  ==============================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sources = scribesource-1
+  agent_foo.channels = memoryChannel-1
+  agent_foo.sources.scribesource-1.type = org.apache.flume.source.scribe.ScribeSource
+  agent_foo.sources.scribesource-1.port = 1463
+  agent_foo.sources.scribesource-1.workerThreads = 5
+  agent_foo.sources.scribesource-1.channels = memoryChannel-1
 
 Flume Sinks
 -----------

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/flume-scribe-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/pom.xml b/flume-ng-sources/flume-scribe-source/pom.xml
new file mode 100644
index 0000000..00c4106
--- /dev/null
+++ b/flume-ng-sources/flume-scribe-source/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sources</groupId>
+  <artifactId>flume-scribe-source</artifactId>
+  <name>Flume Scribe Source</name>
+
+  <profiles>
+    <profile>
+      <id>compileThrift</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <thrift.executable>${env.THRIFT_HOME}/bin/thrift</thrift.executable>
+      </properties>
+      <build>
+       <plugins>
+         <plugin>
+           <groupId>org.apache.thrift.tools</groupId>
+           <artifactId>maven-thrift-plugin</artifactId>
+           <version>0.1.10</version>
+           <configuration>
+             <thriftExecutable>${thrift.executable}</thriftExecutable>
+           </configuration>
+           <executions>
+             <execution>
+               <id>thrift-sources</id>
+               <phase>generate-sources</phase>
+               <goals>
+                 <goal>compile</goal>
+               </goals>
+             </execution>
+           </executions>
+         </plugin>
+         <plugin>
+           <groupId>org.apache.maven.plugins</groupId>
+           <artifactId>maven-compiler-plugin</artifactId>
+           <version>2.3.2</version>
+           <configuration>
+             <source>1.6</source>
+             <target>1.6</target>
+             <excludes>
+               <exclude>**/com/**</exclude>
+             </excludes>
+           </configuration>
+         </plugin>
+       </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>nonThrift</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <pluginManagement>
+          <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-compiler-plugin</artifactId>
+              <version>2.3.2</version>
+              <configuration>
+                <source>1.6</source>
+                <target>1.6</target>
+                <excludes>
+                  <exclude>**/generated-sources/**</exclude>
+                </excludes>
+              </configuration>
+            </plugin>
+          </plugins>
+        </pluginManagement>
+      </build>
+    </profile>
+  </profiles>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java
new file mode 100644
index 0000000..401a7e4
--- /dev/null
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.scribe;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.nio.ByteBuffer;
+
+public class LogEntry implements org.apache.thrift.TBase<LogEntry, LogEntry._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LogEntry");
+
+  private static final org.apache.thrift.protocol.TField CATEGORY_FIELD_DESC = new org.apache.thrift.protocol.TField("category", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+  public String category;
+  public ByteBuffer message;
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    CATEGORY((short)1, "category"),
+    MESSAGE((short)2, "message");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // CATEGORY
+          return CATEGORY;
+        case 2: // MESSAGE
+          return MESSAGE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.CATEGORY, new org.apache.thrift.meta_data.FieldMetaData("category", org.apache.thrift.TFieldRequirementType.DEFAULT,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.DEFAULT,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LogEntry.class, metaDataMap);
+  }
+
+  public LogEntry() {
+  }
+
+  public LogEntry(
+    String category,
+    ByteBuffer message)
+  {
+    this();
+    this.category = category;
+    this.message = message;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public LogEntry(LogEntry other) {
+    if (other.isSetCategory()) {
+      this.category = other.category;
+    }
+    if (other.isSetMessage()) {
+      this.message = other.message;
+    }
+  }
+
+  public LogEntry deepCopy() {
+    return new LogEntry(this);
+  }
+
+  @Override
+  public void clear() {
+    this.category = null;
+    this.message = null;
+  }
+
+  public String getCategory() {
+    return this.category;
+  }
+
+  public LogEntry setCategory(String category) {
+    this.category = category;
+    return this;
+  }
+
+  public void unsetCategory() {
+    this.category = null;
+  }
+
+  /** Returns true if field category is set (has been assigned a value) and false otherwise */
+  public boolean isSetCategory() {
+    return this.category != null;
+  }
+
+  public void setCategoryIsSet(boolean value) {
+    if (!value) {
+      this.category = null;
+    }
+  }
+
+  public ByteBuffer getMessage() {
+    return this.message;
+  }
+
+  public LogEntry setMessage(ByteBuffer message) {
+    this.message = message;
+    return this;
+  }
+
+  public void unsetMessage() {
+    this.message = null;
+  }
+
+  /** Returns true if field message is set (has been assigned a value) and false otherwise */
+  public boolean isSetMessage() {
+    return this.message != null;
+  }
+
+  public void setMessageIsSet(boolean value) {
+    if (!value) {
+      this.message = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case CATEGORY:
+      if (value == null) {
+        unsetCategory();
+      } else {
+        setCategory((String)value);
+      }
+      break;
+
+    case MESSAGE:
+      if (value == null) {
+        unsetMessage();
+      } else {
+        setMessage((ByteBuffer)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case CATEGORY:
+      return getCategory();
+
+    case MESSAGE:
+      return getMessage();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case CATEGORY:
+      return isSetCategory();
+    case MESSAGE:
+      return isSetMessage();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof LogEntry)
+      return this.equals((LogEntry)that);
+    return false;
+  }
+
+  public boolean equals(LogEntry that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_category = true && this.isSetCategory();
+    boolean that_present_category = true && that.isSetCategory();
+    if (this_present_category || that_present_category) {
+      if (!(this_present_category && that_present_category))
+        return false;
+      if (!this.category.equals(that.category))
+        return false;
+    }
+
+    boolean this_present_message = true && this.isSetMessage();
+    boolean that_present_message = true && that.isSetMessage();
+    if (this_present_message || that_present_message) {
+      if (!(this_present_message && that_present_message))
+        return false;
+      if (!this.message.equals(that.message))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(LogEntry other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    LogEntry typedOther = (LogEntry)other;
+
+    lastComparison = Boolean.valueOf(isSetCategory()).compareTo(typedOther.isSetCategory());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCategory()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.category, typedOther.category);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetMessage()).compareTo(typedOther.isSetMessage());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMessage()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.message, typedOther.message);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) {
+        break;
+      }
+      switch (field.id) {
+        case 1: // CATEGORY
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.category = iprot.readString();
+          } else {
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // MESSAGE
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.message = iprot.readBinary();
+          } else {
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate method
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.category != null) {
+      oprot.writeFieldBegin(CATEGORY_FIELD_DESC);
+      oprot.writeString(this.category);
+      oprot.writeFieldEnd();
+    }
+    if (this.message != null) {
+      oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
+      oprot.writeBinary(this.message);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("LogEntry(");
+    boolean first = true;
+
+    sb.append("category:");
+    if (this.category == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.category);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("message:");
+    if (this.message == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.message);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java
new file mode 100644
index 0000000..6bfa84c
--- /dev/null
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.scribe;
+
+public enum ResultCode implements org.apache.thrift.TEnum {
+  OK(0),
+  TRY_LATER(1);
+
+  private final int value;
+
+  private ResultCode(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static ResultCode findByValue(int value) {
+    switch (value) {
+      case 0:
+        return OK;
+      case 1:
+        return TRY_LATER;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
new file mode 100644
index 0000000..ccde51b
--- /dev/null
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.scribe;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Scribe {
+
+  public interface Iface {
+
+    public ResultCode Log(List<LogEntry> messages) throws org.apache.thrift.TException;
+
+  }
+
+  public interface AsyncIface {
+
+    public void Log(List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.Log_call> resultHandler) throws org.apache.thrift.TException;
+
+  }
+
+  public static class Client implements org.apache.thrift.TServiceClient, Iface {
+    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
+      public Factory() {}
+      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+        return new Client(prot);
+      }
+      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+        return new Client(iprot, oprot);
+      }
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol prot)
+    {
+      this(prot, prot);
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot)
+    {
+      iprot_ = iprot;
+      oprot_ = oprot;
+    }
+
+    protected org.apache.thrift.protocol.TProtocol iprot_;
+    protected org.apache.thrift.protocol.TProtocol oprot_;
+
+    protected int seqid_;
+
+    public org.apache.thrift.protocol.TProtocol getInputProtocol()
+    {
+      return this.iprot_;
+    }
+
+    public org.apache.thrift.protocol.TProtocol getOutputProtocol()
+    {
+      return this.oprot_;
+    }
+
+    public ResultCode Log(List<LogEntry> messages) throws org.apache.thrift.TException
+    {
+      send_Log(messages);
+      return recv_Log();
+    }
+
+    public void send_Log(List<LogEntry> messages) throws org.apache.thrift.TException
+    {
+      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("Log", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
+      Log_args args = new Log_args();
+      args.setMessages(messages);
+      args.write(oprot_);
+      oprot_.writeMessageEnd();
+      oprot_.getTransport().flush();
+    }
+
+    public ResultCode recv_Log() throws org.apache.thrift.TException
+    {
+      org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
+      if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
+        org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
+        iprot_.readMessageEnd();
+        throw x;
+      }
+      if (msg.seqid != seqid_) {
+        throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "Log failed: out of sequence response");
+      }
+      Log_result result = new Log_result();
+      result.read(iprot_);
+      iprot_.readMessageEnd();
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "Log failed: unknown result");
+    }
+
+  }
+  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
+    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+      private org.apache.thrift.async.TAsyncClientManager clientManager;
+      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+      public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+        this.clientManager = clientManager;
+        this.protocolFactory = protocolFactory;
+      }
+      public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+        return new AsyncClient(protocolFactory, clientManager, transport);
+      }
+    }
+
+    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
+      super(protocolFactory, clientManager, transport);
+    }
+
+    public void Log(List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<Log_call> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      Log_call method_call = new Log_call(messages, resultHandler, this, protocolFactory, transport);
+      this.currentMethod = method_call;
+      manager.call(method_call);
+    }
+
+    public static class Log_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private List<LogEntry> messages;
+      public Log_call(List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<Log_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.messages = messages;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("Log", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        Log_args args = new Log_args();
+        args.setMessages(messages);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public ResultCode getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_Log();
+      }
+    }
+
+  }
+
+  public static class Processor implements org.apache.thrift.TProcessor {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
+    public Processor(Iface iface)
+    {
+      iface_ = iface;
+      processMap_.put("Log", new Log());
+    }
+
+    protected static interface ProcessFunction {
+      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
+    }
+
+    private Iface iface_;
+    protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
+
+    public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
+    {
+      org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
+      ProcessFunction fn = processMap_.get(msg.name);
+      if (fn == null) {
+        org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
+        iprot.readMessageEnd();
+        org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
+        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
+        x.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+        return true;
+      }
+      fn.process(msg.seqid, iprot, oprot);
+      return true;
+    }
+
+    private class Log implements ProcessFunction {
+      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
+      {
+        Log_args args = new Log_args();
+        try {
+          args.read(iprot);
+        } catch (org.apache.thrift.protocol.TProtocolException e) {
+          iprot.readMessageEnd();
+          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
+          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("Log", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
+          x.write(oprot);
+          oprot.writeMessageEnd();
+          oprot.getTransport().flush();
+          return;
+        }
+        iprot.readMessageEnd();
+        Log_result result = new Log_result();
+        result.success = iface_.Log(args.messages);
+        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("Log", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+      }
+
+    }
+
+  }
+
+  public static class Log_args implements org.apache.thrift.TBase<Log_args, Log_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Log_args");
+
+    private static final org.apache.thrift.protocol.TField MESSAGES_FIELD_DESC = new org.apache.thrift.protocol.TField("messages", org.apache.thrift.protocol.TType.LIST, (short)1);
+
+    public List<LogEntry> messages;
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      MESSAGES((short)1, "messages");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // MESSAGES
+            return MESSAGES;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.MESSAGES, new org.apache.thrift.meta_data.FieldMetaData("messages", org.apache.thrift.TFieldRequirementType.DEFAULT,
+          new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+              new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, LogEntry.class))));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Log_args.class, metaDataMap);
+    }
+
+    public Log_args() {
+    }
+
+    public Log_args(
+      List<LogEntry> messages)
+    {
+      this();
+      this.messages = messages;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public Log_args(Log_args other) {
+      if (other.isSetMessages()) {
+        List<LogEntry> __this__messages = new ArrayList<LogEntry>();
+        for (LogEntry other_element : other.messages) {
+          __this__messages.add(new LogEntry(other_element));
+        }
+        this.messages = __this__messages;
+      }
+    }
+
+    public Log_args deepCopy() {
+      return new Log_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.messages = null;
+    }
+
+    public int getMessagesSize() {
+      return (this.messages == null) ? 0 : this.messages.size();
+    }
+
+    public java.util.Iterator<LogEntry> getMessagesIterator() {
+      return (this.messages == null) ? null : this.messages.iterator();
+    }
+
+    public void addToMessages(LogEntry elem) {
+      if (this.messages == null) {
+        this.messages = new ArrayList<LogEntry>();
+      }
+      this.messages.add(elem);
+    }
+
+    public List<LogEntry> getMessages() {
+      return this.messages;
+    }
+
+    public Log_args setMessages(List<LogEntry> messages) {
+      this.messages = messages;
+      return this;
+    }
+
+    public void unsetMessages() {
+      this.messages = null;
+    }
+
+    /** Returns true if field messages is set (has been assigned a value) and false otherwise */
+    public boolean isSetMessages() {
+      return this.messages != null;
+    }
+
+    public void setMessagesIsSet(boolean value) {
+      if (!value) {
+        this.messages = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case MESSAGES:
+        if (value == null) {
+          unsetMessages();
+        } else {
+          setMessages((List<LogEntry>)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case MESSAGES:
+        return getMessages();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case MESSAGES:
+        return isSetMessages();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof Log_args)
+        return this.equals((Log_args)that);
+      return false;
+    }
+
+    public boolean equals(Log_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_messages = true && this.isSetMessages();
+      boolean that_present_messages = true && that.isSetMessages();
+      if (this_present_messages || that_present_messages) {
+        if (!(this_present_messages && that_present_messages))
+          return false;
+        if (!this.messages.equals(that.messages))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(Log_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      Log_args typedOther = (Log_args)other;
+
+      lastComparison = Boolean.valueOf(isSetMessages()).compareTo(typedOther.isSetMessages());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetMessages()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messages, typedOther.messages);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == org.apache.thrift.protocol.TType.STOP) {
+          break;
+        }
+        switch (field.id) {
+          case 1: // MESSAGES
+            if (field.type == org.apache.thrift.protocol.TType.LIST) {
+              {
+                org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+                this.messages = new ArrayList<LogEntry>(_list0.size);
+                for (int _i1 = 0; _i1 < _list0.size; ++_i1)
+                {
+                  LogEntry _elem2;
+                  _elem2 = new LogEntry();
+                  _elem2.read(iprot);
+                  this.messages.add(_elem2);
+                }
+                iprot.readListEnd();
+              }
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (this.messages != null) {
+        oprot.writeFieldBegin(MESSAGES_FIELD_DESC);
+        {
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.messages.size()));
+          for (LogEntry _iter3 : this.messages)
+          {
+            _iter3.write(oprot);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("Log_args(");
+      boolean first = true;
+
+      sb.append("messages:");
+      if (this.messages == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.messages);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+  }
+
+  public static class Log_result implements org.apache.thrift.TBase<Log_result, Log_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Log_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.I32, (short)0);
+
+    /**
+     *
+     * @see ResultCode
+     */
+    public ResultCode success;
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      /**
+       *
+       * @see ResultCode
+       */
+      SUCCESS((short)0, "success");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
+          new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, ResultCode.class)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Log_result.class, metaDataMap);
+    }
+
+    public Log_result() {
+    }
+
+    public Log_result(
+      ResultCode success)
+    {
+      this();
+      this.success = success;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public Log_result(Log_result other) {
+      if (other.isSetSuccess()) {
+        this.success = other.success;
+      }
+    }
+
+    public Log_result deepCopy() {
+      return new Log_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.success = null;
+    }
+
+    /**
+     *
+     * @see ResultCode
+     */
+    public ResultCode getSuccess() {
+      return this.success;
+    }
+
+    /**
+     *
+     * @see ResultCode
+     */
+    public Log_result setSuccess(ResultCode success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((ResultCode)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof Log_result)
+        return this.equals((Log_result)that);
+      return false;
+    }
+
+    public boolean equals(Log_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(Log_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      Log_result typedOther = (Log_result)other;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField field;
+      iprot.readStructBegin();
+      while (true)
+      {
+        field = iprot.readFieldBegin();
+        if (field.type == org.apache.thrift.protocol.TType.STOP) {
+          break;
+        }
+        switch (field.id) {
+          case 0: // SUCCESS
+            if (field.type == org.apache.thrift.protocol.TType.I32) {
+              this.success = ResultCode.findByValue(iprot.readI32());
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      oprot.writeStructBegin(STRUCT_DESC);
+
+      if (this.isSetSuccess()) {
+        oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+        oprot.writeI32(this.success.getValue());
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("Log_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
new file mode 100644
index 0000000..6190b11
--- /dev/null
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source.scribe;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractSource;
+import org.apache.flume.source.scribe.Scribe.Iface;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flume should adopt the Scribe entry {@code LogEntry} from existing
+ * Scribe system. Mostly, we may receive message from local Scribe and Flume
+ * take responsibility of central Scribe.
+ *
+ * <p>
+ * We use Thrift without deserializing, throughput has 2X increasing
+ */
+public class ScribeSource extends AbstractSource implements
+      EventDrivenSource, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScribeSource.class);
+
+  public static final String SCRIBE_CATEGORY = "category";
+
+  private static final int DEFAULT_WORKERS = 5;
+
+  private TServer server;
+  private int port = 1499;
+  private int workers;
+
+  private SourceCounter sourceCounter;
+
+  @Override
+  public void configure(Context context) {
+    port = context.getInteger("port", port);
+
+    workers = context.getInteger("workerThreads", DEFAULT_WORKERS);
+    if (workers <= 0) {
+      workers = DEFAULT_WORKERS;
+    }
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
+  }
+
+  private class Startup extends Thread {
+
+    public void run() {
+      try {
+        Scribe.Processor processor = new Scribe.Processor(new Receiver());
+        TNonblockingServerTransport transport = new TNonblockingServerSocket(port);
+        THsHaServer.Args args = new THsHaServer.Args(transport);
+
+        args.workerThreads(workers);
+        args.processor(processor);
+        args.transportFactory(new TFramedTransport.Factory());
+        args.protocolFactory(new TBinaryProtocol.Factory(false, false));
+
+        server = new THsHaServer(args);
+
+        LOG.info("Starting Scribe Source on port " + port);
+
+        server.serve();
+      } catch (Exception e) {
+        LOG.warn("Scribe failed", e);
+      }
+    }
+
+  }
+
+  @Override
+  public void start() {
+    Startup startupThread = new Startup();
+    startupThread.start();
+
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {}
+
+    if (!server.isServing()) {
+      throw new IllegalStateException("Failed initialization of ScribeSource");
+    }
+
+    sourceCounter.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Scribe source stopping");
+
+    if (server != null) {
+      server.stop();
+    }
+
+    sourceCounter.stop();
+    super.stop();
+
+    LOG.info("Scribe source stopped. Metrics:{}", sourceCounter);
+  }
+
+
+  class Receiver implements Iface {
+
+    public ResultCode Log(List<LogEntry> list) throws TException {
+      if (list != null && list.size() > 0) {
+        sourceCounter.addToEventReceivedCount(list.size());
+
+        try {
+          List<Event> events = new ArrayList<Event>(list.size());
+
+          for (LogEntry entry : list) {
+            Map<String, String> headers = new HashMap<String, String>(1, 1);
+            headers.put(SCRIBE_CATEGORY, entry.getCategory());
+
+            ByteBuffer buffer = entry.getMessage();
+            int limit = buffer.limit();
+            int pos = buffer.position();
+
+            //Obtains actual data
+            byte[] buf = new byte[limit - pos];
+            System.arraycopy(buffer.array(), pos, buf, 0, buf.length);
+
+            Event event = EventBuilder.withBody(buf, headers);
+            events.add(event);
+          }
+
+          getChannelProcessor().processEventBatch(events);
+
+          sourceCounter.addToEventAcceptedCount(list.size());
+          return ResultCode.OK;
+        } catch (Exception e) {
+          LOG.warn("Scribe source handling failure", e);
+        }
+      }
+
+      return ResultCode.TRY_LATER;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/flume-ng-sources/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml
new file mode 100644
index 0000000..5bb7ebf
--- /dev/null
+++ b/flume-ng-sources/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-ng-sources</artifactId>
+  <name>Flume Sources</name>
+  <packaging>pom</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <modules>
+    <module>flume-scribe-source</module>
+  </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3cac43e5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9142e3f..a277585 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,7 @@ limitations under the License.
     <module>flume-ng-core</module>
     <module>flume-ng-configuration</module>
     <module>flume-ng-sinks</module>
+    <module>flume-ng-sources</module>
     <module>flume-ng-node</module>
     <module>flume-ng-dist</module>
     <module>flume-ng-channels</module>
@@ -814,6 +815,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sources</groupId>
+        <artifactId>flume-scribe-source</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
         <artifactId>flume-thrift-source</artifactId>
         <version>1.3.0-SNAPSHOT</version>


Mime
View raw message