spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject git commit: [STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu...
Date Tue, 29 Jul 2014 18:11:34 GMT
Repository: spark
Updated Branches:
  refs/heads/master fc4d05700 -> 800ecff4b


[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu...

...sh model

Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the
receiver fails, it currently has to be restarted on the same node to be able to receive data.

This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new
DStream that is also included in this commit. This model ensures that data can be pulled into Spark from
Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on
multiple threads for better performance.

Author: Hari Shreedharan <harishreedharan@gmail.com>
Author: Hari Shreedharan <hshreedharan@apache.org>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: harishreedharan <hshreedharan@cloudera.com>

Closes #807 from harishreedharan/master and squashes the following commits:

e7f70a3 [Hari Shreedharan] Merge remote-tracking branch 'asf-git/master'
96cfb6f [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
e48d785 [Hari Shreedharan] Documenting flume-sink being ignored for Mima checks.
5f212ce [Hari Shreedharan] Ignore Spark Sink from mima.
981bf62 [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
7a1bc6e [Hari Shreedharan] Fix SparkBuild.scala
a082eb3 [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
1f47364 [Hari Shreedharan] Minor fixes.
73d6f6d [Hari Shreedharan] Cleaned up tests a bit. Added some docs in multiple places.
65b76b4 [Hari Shreedharan] Fixing the unit test.
e59cc20 [Hari Shreedharan] Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method.
f3c99d1 [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
3572180 [Hari Shreedharan] Adding a license header, making Jenkins happy.
799509f [Hari Shreedharan] Fix a compile issue.
3c5194c [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
d248d22 [harishreedharan] Merge pull request #1 from tdas/flume-polling
10b6214 [Tathagata Das] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java.
1edc806 [Hari Shreedharan] SPARK-1729. Update logging in Spark Sink.
8c00289 [Hari Shreedharan] More debug messages
393bd94 [Hari Shreedharan] SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections.
120e2a1 [Hari Shreedharan] SPARK-1729. Some test changes and changes to utils classes.
9fd0da7 [Hari Shreedharan] SPARK-1729. Use foreach instead of map for all Options.
8136aa6 [Hari Shreedharan] Adding TransactionProcessor to map on returning batch of data
86aa274 [Hari Shreedharan] Merge remote-tracking branch 'asf/master'
205034d [Hari Shreedharan] Merging master in
4b0c7fc [Hari Shreedharan] FLUME-1729. New Flume-Spark integration.
bda01fc [Hari Shreedharan] FLUME-1729. Flume-Spark integration.
0d69604 [Hari Shreedharan] FLUME-1729. Better Flume-Spark integration.
3c23c18 [Hari Shreedharan] SPARK-1729. New Spark-Flume integration.
70bcc2a [Hari Shreedharan] SPARK-1729. New Flume-Spark integration.
d6fa3aa [Hari Shreedharan] SPARK-1729. New Flume-Spark integration.
e7da512 [Hari Shreedharan] SPARK-1729. Fixing import order
9741683 [Hari Shreedharan] SPARK-1729. Fixes based on review.
c604a3c [Hari Shreedharan] SPARK-1729. Optimize imports.
0f10788 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
87775aa [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
8df37e4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
03d6c1c [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
08176ad [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
d24d9d4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
6d6776a [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/800ecff4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/800ecff4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/800ecff4

Branch: refs/heads/master
Commit: 800ecff4b1127d9042d5a8a746348fb4d45aa34b
Parents: fc4d057
Author: Hari Shreedharan <harishreedharan@gmail.com>
Authored: Tue Jul 29 11:11:29 2014 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Jul 29 11:11:29 2014 -0700

----------------------------------------------------------------------
 .../streaming/FlumePollingEventCount.scala      |  67 ++++++
 external/flume-sink/pom.xml                     | 100 ++++++++
 .../flume-sink/src/main/avro/sparkflume.avdl    |  40 ++++
 .../spark/streaming/flume/sink/Logging.scala    | 125 ++++++++++
 .../flume/sink/SparkAvroCallbackHandler.scala   | 131 +++++++++++
 .../spark/streaming/flume/sink/SparkSink.scala  | 154 +++++++++++++
 .../streaming/flume/sink/SparkSinkUtils.scala   |  28 +++
 .../flume/sink/TransactionProcessor.scala       | 228 +++++++++++++++++++
 external/flume/pom.xml                          |   5 +
 .../streaming/flume/EventTransformer.scala      |  72 ++++++
 .../streaming/flume/FlumeInputDStream.scala     |   3 -
 .../flume/FlumePollingInputDStream.scala        | 178 +++++++++++++++
 .../spark/streaming/flume/FlumeUtils.scala      | 144 +++++++++++-
 .../flume/JavaFlumePollingStreamSuite.java      |  44 ++++
 .../flume/FlumePollingStreamSuite.scala         | 195 ++++++++++++++++
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |  20 +-
 project/plugins.sbt                             |   2 +
 18 files changed, 1524 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
new file mode 100644
index 0000000..1cc8c8d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+import java.net.InetSocketAddress
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
+ *  the Spark Streaming programming guide for more details.
+ *
+ *  Usage: FlumePollingEventCount <host> <port>
+ *    `host` is the host on which the Spark Sink is running.
+ *    `port` is the port at which the Spark Sink is listening.
+ *
+ *  To run this example:
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
+ */
+object FlumePollingEventCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: FlumePollingEventCount <host> <port>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(host, IntParam(port)) = args
+
+    val batchInterval = Milliseconds(2000)
+
+    // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
+    val ssc = new StreamingContext(sparkConf, batchInterval)
+
+    // Create a flume stream that polls the Spark Sink running in a Flume agent
+    val stream = FlumeUtils.createPollingStream(ssc, host, port)
+
+    // Print out the count of events received from this server in each batch
+    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000..d11129c
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-streaming-flume-sink_2.10</artifactId>
+  <properties>
+    <sbt.project.name>streaming-flume-sink</sbt.project.name>
+  </properties>
+
+  <packaging>jar</packaging>
+  <name>Spark Project External Flume Sink</name>
+  <url>http://spark.apache.org/</url>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <version>1.4.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>1.4.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>libthrift</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.4</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>1.7.3</version>
+        <configuration>
+          <!-- Generate the output in the same directory as the sbt-avro-plugin -->
+          <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/avro/sparkflume.avdl
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000..8806e86
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.streaming.flume.sink")
+
+protocol SparkFlumeProtocol {
+
+  record SparkSinkEvent {
+    map<string> headers;
+    bytes body;
+  }
+
+  record EventBatch {
+    string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
+    string sequenceNumber;
+    array<SparkSinkEvent> events;
+  }
+
+  EventBatch getEventBatch (int n);
+
+  void ack (string sequenceNumber);
+
+  void nack (string sequenceNumber);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
new file mode 100644
index 0000000..17cbc67
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
+ * The org.apache.spark.Logging is not used so that all of Spark is not brought
+ * in as a dependency.
+ */
+private[sink] trait Logging {
+  // Make the log field transient so that objects with Logging can
+  // be serialized and used on another machine
+  @transient private var log_ : Logger = null
+
+  // Method to get or create the logger for this object
+  protected def log: Logger = {
+    if (log_ == null) {
+      initializeIfNecessary()
+      var className = this.getClass.getName
+      // Ignore trailing $'s in the class names for Scala objects
+      if (className.endsWith("$")) {
+        className = className.substring(0, className.length - 1)
+      }
+      log_ = LoggerFactory.getLogger(className)
+    }
+    log_
+  }
+
+  // Log methods that take only a String
+  protected def logInfo(msg: => String) {
+    if (log.isInfoEnabled) log.info(msg)
+  }
+
+  protected def logDebug(msg: => String) {
+    if (log.isDebugEnabled) log.debug(msg)
+  }
+
+  protected def logTrace(msg: => String) {
+    if (log.isTraceEnabled) log.trace(msg)
+  }
+
+  protected def logWarning(msg: => String) {
+    if (log.isWarnEnabled) log.warn(msg)
+  }
+
+  protected def logError(msg: => String) {
+    if (log.isErrorEnabled) log.error(msg)
+  }
+
+  // Log methods that take Throwables (Exceptions/Errors) too
+  protected def logInfo(msg: => String, throwable: Throwable) {
+    if (log.isInfoEnabled) log.info(msg, throwable)
+  }
+
+  protected def logDebug(msg: => String, throwable: Throwable) {
+    if (log.isDebugEnabled) log.debug(msg, throwable)
+  }
+
+  protected def logTrace(msg: => String, throwable: Throwable) {
+    if (log.isTraceEnabled) log.trace(msg, throwable)
+  }
+
+  protected def logWarning(msg: => String, throwable: Throwable) {
+    if (log.isWarnEnabled) log.warn(msg, throwable)
+  }
+
+  protected def logError(msg: => String, throwable: Throwable) {
+    if (log.isErrorEnabled) log.error(msg, throwable)
+  }
+
+  protected def isTraceEnabled(): Boolean = {
+    log.isTraceEnabled
+  }
+
+  private def initializeIfNecessary() {
+    if (!Logging.initialized) {
+      Logging.initLock.synchronized {
+        if (!Logging.initialized) {
+          initializeLogging()
+        }
+      }
+    }
+  }
+
+  private def initializeLogging() {
+    Logging.initialized = true
+
+    // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
+    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+    log
+  }
+}
+
+private[sink] object Logging {
+  @volatile private var initialized = false
+  val initLock = new Object()
+  try {
+    // We use reflection here to handle the case where users remove the
+    // slf4j-to-jul bridge order to route their logs to JUL.
+    val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+    bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
+    val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
+    if (!installed) {
+      bridgeClass.getMethod("install").invoke(null)
+    }
+  } catch {
+    case e: ClassNotFoundException => // can't log anything yet so just fail silently
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
new file mode 100644
index 0000000..7da8eb3
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.flume.Channel
+import org.apache.commons.lang.RandomStringUtils
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+/**
+ * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
+ * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
+ * @param threads Number of threads to use to process requests.
+ * @param channel The channel that the sink pulls events from
+ * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
+ *                           is rolled back.
+ */
+// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
+// rolled back from the thread it was originally created in. So each getEvents call from Spark
+// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
+// and events are pulled off the channel. Once the events are sent to spark,
+// that thread is blocked and the TransactionProcessor is saved in a map,
+// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
+// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
+// unblocked, at which point the transaction is committed or rolled back.
+
+private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
+  val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
+  val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
+    new ThreadFactoryBuilder().setDaemon(true)
+      .setNameFormat("Spark Sink Processor Thread - %d").build()))
+  private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+  // This sink will not persist sequence numbers and reuses them if it gets restarted.
+  // So it is possible to commit a transaction which may have been meant for the sink before the
+  // restart.
+  // Since the new txn may not have the same sequence number we must guard against accidentally
+  // committing a new transaction. To reduce the probability of that happening a random string is
+  // prepended to the sequence number. Does not change for life of sink
+  private val seqBase = RandomStringUtils.randomAlphanumeric(8)
+  private val seqCounter = new AtomicLong(0)
+
+  /**
+   * Returns a bunch of events to Spark over Avro RPC.
+   * @param n Maximum number of events to return in a batch
+   * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
+   */
+  override def getEventBatch(n: Int): EventBatch = {
+    logDebug("Got getEventBatch call from Spark.")
+    val sequenceNumber = seqBase + seqCounter.incrementAndGet()
+    val processor = new TransactionProcessor(channel, sequenceNumber,
+      n, transactionTimeout, backOffInterval, this)
+    transactionExecutorOpt.foreach(executor => {
+      executor.submit(processor)
+    })
+    // Wait until a batch is available - will be an error if error message is non-empty
+    val batch = processor.getEventBatch
+    if (!SparkSinkUtils.isErrorBatch(batch)) {
+      processorMap.put(sequenceNumber.toString, processor)
+      logDebug("Sending event batch with sequence number: " + sequenceNumber)
+    }
+    batch
+  }
+
+  /**
+   * Called by Spark to indicate successful commit of a batch
+   * @param sequenceNumber The sequence number of the event batch that was successful
+   */
+  override def ack(sequenceNumber: CharSequence): Void = {
+    logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
+    completeTransaction(sequenceNumber, success = true)
+    null
+  }
+
+  /**
+   * Called by Spark to indicate failed commit of a batch
+   * @param sequenceNumber The sequence number of the event batch that failed
+   * @return
+   */
+  override def nack(sequenceNumber: CharSequence): Void = {
+    completeTransaction(sequenceNumber, success = false)
+    logInfo("Spark failed to commit transaction. Will reattempt events.")
+    null
+  }
+
+  /**
+   * Helper method to commit or rollback a transaction.
+   * @param sequenceNumber The sequence number of the batch that was completed
+   * @param success Whether the batch was successful or not.
+   */
+  private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+    Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
+      processor.batchProcessed(success)
+    })
+  }
+
+  /**
+   * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
+   * @param sequenceNumber
+   * @return The transaction processor for the corresponding batch. Note that this instance is no
+   *         longer tracked and the caller is responsible for that txn processor.
+   */
+  private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
+    processorMap.remove(sequenceNumber.toString) // The toString is required!
+  }
+
+  /**
+   * Shuts down the executor used to process transactions.
+   */
+  def shutdown() {
+    logInfo("Shutting down Spark Avro Callback Handler")
+    transactionExecutorOpt.foreach(executor => {
+      executor.shutdownNow()
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
new file mode 100644
index 0000000..7b73513
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.Context
+import org.apache.flume.Sink.Status
+import org.apache.flume.conf.{Configurable, ConfigurationException}
+import org.apache.flume.sink.AbstractSink
+
+/**
+ * A sink that uses Avro RPC to run a server that can be polled by Spark's
+ * FlumePollingInputDStream. This sink has the following configuration parameters:
+ *
+ * hostname - The hostname to bind to. Default: 0.0.0.0
+ * port - The port to bind to. (No default - mandatory)
+ * timeout - Time in seconds after which a transaction is rolled back,
+ * if an ACK is not received from Spark within that time
+ * threads - Number of threads to use to receive requests from Spark (Default: 10)
+ *
+ * This sink is unlike other Flume sinks in the sense that it does not push data,
+ * instead the process method in this sink simply blocks the SinkRunner the first time it is
+ * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
+ *
+ * Each time a getEventBatch call comes, creates a transaction and reads events
+ * from the channel. When enough events are read, the events are sent to the Spark receiver and
+ * the thread itself is blocked and a reference to it saved off.
+ *
+ * When the ack for that batch is received,
+ * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * channel from the same thread it was originally created in (since Flume transactions are
+ * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
+ * is received within the specified timeout, the transaction is rolled back too. If an ack comes
+ * after that, it is simply ignored and the events get re-sent.
+ *
+ */
+
+private[flume]
+class SparkSink extends AbstractSink with Logging with Configurable {
+
+  // Size of the pool to use for holding transaction processors.
+  private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
+
+  // Timeout for each transaction. If spark does not respond in this much time,
+  // rollback the transaction
+  private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+  // Address info to bind on
+  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+  private var port: Int = 0
+
+  private var backOffInterval: Int = 200
+
+  // Handle to the server
+  private var serverOpt: Option[NettyServer] = None
+
+  // The handler that handles the callback from Avro
+  private var handler: Option[SparkAvroCallbackHandler] = None
+
+  // Latch that blocks off the Flume framework from wasting 1 thread.
+  private val blockingLatch = new CountDownLatch(1)
+
+  override def start() {
+    logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
+      hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
+      transactionTimeout + ".")
+    handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
+      backOffInterval))
+    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
+    // Using the constructor that takes specific thread-pools requires bringing in netty
+    // dependencies which are being excluded in the build. In practice,
+    // Netty dependencies are already available on the JVM as Flume would have pulled them in.
+    serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
+    serverOpt.foreach(server => {
+      logInfo("Starting Avro server for sink: " + getName)
+      server.start()
+    })
+    super.start()
+  }
+
+  override def stop() {
+    logInfo("Stopping Spark Sink: " + getName)
+    handler.foreach(callbackHandler => {
+      callbackHandler.shutdown()
+    })
+    serverOpt.foreach(server => {
+      logInfo("Stopping Avro Server for sink: " + getName)
+      server.close()
+      server.join()
+    })
+    blockingLatch.countDown()
+    super.stop()
+  }
+
+  override def configure(ctx: Context) {
+    import SparkSinkConfig._
+    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+    port = Option(ctx.getInteger(CONF_PORT)).
+      getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
+    poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
+    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+    backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
+    logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
+      "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
+      "backoffInterval: " + backOffInterval)
+  }
+
+  override def process(): Status = {
+    // This method is called in a loop by the Flume framework - block it until the sink is
+    // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
+    // being shut down.
+    logInfo("Blocking Sink Runner, sink will continue to run..")
+    blockingLatch.await()
+    Status.BACKOFF
+  }
+}
+
+/**
+ * Configuration parameters and their defaults.
+ */
+private[flume]
+object SparkSinkConfig {
+  val THREADS = "threads"
+  val DEFAULT_THREADS = 10
+
+  val CONF_TRANSACTION_TIMEOUT = "timeout"
+  val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+  val CONF_HOSTNAME = "hostname"
+  val DEFAULT_HOSTNAME = "0.0.0.0"
+
+  val CONF_PORT = "port"
+
+  val CONF_BACKOFF_INTERVAL = "backoffInterval"
+  val DEFAULT_BACKOFF_INTERVAL = 200
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
new file mode 100644
index 0000000..47c0e29
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+private[flume] object SparkSinkUtils {
+  /**
+   * This method determines if this batch represents an error or not.
+   * @param batch - The batch to check
+   * @return - true if the batch represents an error
+   */
+  def isErrorBatch(batch: EventBatch): Boolean = {
+    !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
----------------------------------------------------------------------
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
new file mode 100644
index 0000000..b9e3c78
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
+
+import scala.util.control.Breaks
+
+import org.apache.flume.{Transaction, Channel}
+
+// Flume forces transactions to be thread-local (horrible, I know!)
+// So the sink basically spawns a new thread to pull the events out within a transaction.
+// The thread fills in the event batch object that is set before the thread is scheduled.
+// After filling it in, the thread waits on a condition - which is released only
+// when the success message comes back for the specific sequence number for that event batch.
+/**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. The thread is blocked until the success call for that transaction
+ * comes back with an ACK or NACK.
+ * @param channel The channel from which to pull events
+ * @param seqNum The sequence number to use for the transaction. Must be unique
+ * @param maxBatchSize The maximum number of events to process per batch
+ * @param transactionTimeout Time in seconds after which a transaction must be rolled back
+ *                           without waiting for an ACK from Spark
+ * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
+ */
+private class TransactionProcessor(val channel: Channel, val seqNum: String,
+  var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
+  val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
+
+  // If a real batch is not returned, we always have to return an error batch.
+  @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
+    util.Collections.emptyList())
+
+  // Synchronization primitives
+  val batchGeneratedLatch = new CountDownLatch(1)
+  val batchAckLatch = new CountDownLatch(1)
+
+  // Sanity check to ensure we don't loop like crazy
+  val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
+
+  // OK to use volatile, since the change would only make this true (otherwise it will be
+  // changed to false - we never apply a negation operation to this) - which means the transaction
+  // succeeded.
+  @volatile private var batchSuccess = false
+
+  // The transaction that this processor would handle
+  var txOpt: Option[Transaction] = None
+
+  /**
+   * Get an event batch from the channel. This method will block until a batch of events is
+   * available from the channel. If no events are available after a large number of attempts of
+   * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
+   *
+   * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
+   *         maximum of maxBatchSize events
+   */
+  def getEventBatch: EventBatch = {
+    batchGeneratedLatch.await()
+    eventBatch
+  }
+
+  /**
+   * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
+   * method is a no-op if it is called after transactionTimeout has expired since
+   * getEventBatch returned a batch of events.
+   * @param success True if an ACK was received and the transaction should be committed, else false.
+   */
+  def batchProcessed(success: Boolean) {
+    logDebug("Batch processed for sequence number: " + seqNum)
+    batchSuccess = success
+    batchAckLatch.countDown()
+  }
+
+  /**
+   * Populates events into the event batch. If the batch cannot be populated,
+   * this method will not set the events into the event batch, but it sets an error message.
+   */
+  private def populateEvents() {
+    try {
+      txOpt = Option(channel.getTransaction)
+      if(txOpt.isEmpty) {
+        eventBatch.setErrorMsg("Something went wrong. Channel was " +
+          "unable to create a transaction!")
+      }
+      txOpt.foreach(tx => {
+        tx.begin()
+        val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
+        val loop = new Breaks
+        var gotEventsInThisTxn = false
+        var loopCounter: Int = 0
+        loop.breakable {
+          while (events.size() < maxBatchSize
+            && loopCounter < totalAttemptsToRemoveFromChannel) {
+            loopCounter += 1
+            Option(channel.take()) match {
+              case Some(event) =>
+                events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
+                  ByteBuffer.wrap(event.getBody)))
+                gotEventsInThisTxn = true
+              case None =>
+                if (!gotEventsInThisTxn) {
+                  logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
+                    " the current transaction")
+                  TimeUnit.MILLISECONDS.sleep(backOffInterval)
+                } else {
+                  loop.break()
+                }
+            }
+          }
+        }
+        if (!gotEventsInThisTxn) {
+          val msg = "Tried several times, " +
+            "but did not get any events from the channel!"
+          logWarning(msg)
+          eventBatch.setErrorMsg(msg)
+        } else {
+          // At this point, the events are available, so fill them into the event batch
+          eventBatch = new EventBatch("",seqNum, events)
+        }
+      })
+    } catch {
+      case e: Exception =>
+        logWarning("Error while processing transaction.", e)
+        eventBatch.setErrorMsg(e.getMessage)
+        try {
+          txOpt.foreach(tx => {
+            rollbackAndClose(tx, close = true)
+          })
+        } finally {
+          txOpt = None
+        }
+    } finally {
+      batchGeneratedLatch.countDown()
+    }
+  }
+
+  /**
+   * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
+   * this method commits the transaction with the channel. If the ACK does not come in within
+   * that time or a NACK comes in, this method rolls back the transaction.
+   */
+  private def processAckOrNack() {
+    batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
+    txOpt.foreach(tx => {
+      if (batchSuccess) {
+        try {
+          logDebug("Committing transaction")
+          tx.commit()
+        } catch {
+          case e: Exception =>
+            logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
+              "back", e)
+            rollbackAndClose(tx, close = false) // tx will be closed later anyway
+        } finally {
+          tx.close()
+        }
+      } else {
+        logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
+        rollbackAndClose(tx, close = true)
+        // This might have been due to timeout or a NACK. Either way the following call does not
+        // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
+        parent.removeAndGetProcessor(seqNum)
+      }
+    })
+  }
+
+  /**
+   * Helper method to rollback and optionally close a transaction
+   * @param tx The transaction to rollback
+   * @param close Whether the transaction should be closed or not after rolling back
+   */
+  private def rollbackAndClose(tx: Transaction, close: Boolean) {
+    try {
+      logWarning("Spark was unable to successfully process the events. Transaction is being " +
+        "rolled back.")
+      tx.rollback()
+    } catch {
+      case e: Exception =>
+        logError("Error rolling back transaction. Rollback may have failed!", e)
+    } finally {
+      if (close) {
+        tx.close()
+      }
+    }
+  }
+
+  /**
+   * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
+   * @param inMap The map to be converted
+   * @return The converted map
+   */
+  private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+    CharSequence] = {
+    val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+    charSeqMap.putAll(inMap)
+    charSeqMap
+  }
+
+  /**
+   * When the thread is started it sets as many events as the batch size or less (if enough
+   * events aren't available) into the eventBatch and object and lets any threads waiting on the
+   * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
+   * or for a specified timeout and commits or rolls back the transaction.
+   * @return
+   */
+  override def call(): Void = {
+    populateEvents()
+    processAckOrNack()
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 874b8a7..9f680b2 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -77,6 +77,11 @@
       <artifactId>junit-interface</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-flume-sink_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
new file mode 100644
index 0000000..dc629df
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.io.{ObjectOutput, ObjectInput}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+/**
+ * A simple object that provides the implementation of readExternal and writeExternal for both
+ * the wrapper classes for Flume-style Events.
+ */
+private[streaming] object EventTransformer extends Logging {
+  def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
+    Array[Byte]) = {
+    val bodyLength = in.readInt()
+    val bodyBuff = new Array[Byte](bodyLength)
+    in.readFully(bodyBuff)
+
+    val numHeaders = in.readInt()
+    val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+    for (i <- 0 until numHeaders) {
+      val keyLength = in.readInt()
+      val keyBuff = new Array[Byte](keyLength)
+      in.readFully(keyBuff)
+      val key: String = Utils.deserialize(keyBuff)
+
+      val valLength = in.readInt()
+      val valBuff = new Array[Byte](valLength)
+      in.readFully(valBuff)
+      val value: String = Utils.deserialize(valBuff)
+
+      headers.put(key, value)
+    }
+    (headers, bodyBuff)
+  }
+
+  def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
+    body: Array[Byte]) {
+    out.writeInt(body.length)
+    out.write(body)
+    val numHeaders = headers.size()
+    out.writeInt(numHeaders)
+    for ((k,v) <- headers) {
+      val keyBuff = Utils.serialize(k.toString)
+      out.writeInt(keyBuff.length)
+      out.write(keyBuff)
+      val valBuff = Utils.serialize(v.toString)
+      out.writeInt(valBuff.length)
+      out.write(valBuff)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 56d2886..4b2ea45 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -39,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver
 
 import org.jboss.netty.channel.ChannelPipelineFactory
 import org.jboss.netty.channel.Channels
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.ChannelFactory
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
 import org.jboss.netty.handler.codec.compression._
-import org.jboss.netty.handler.execution.ExecutionHandler
 
 private[streaming]
 class FlumeInputDStream[T: ClassTag](

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
new file mode 100644
index 0000000..148262b
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume
+
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.flume.sink._
+
+/**
+ * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
+ * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
+ * @param _ssc Streaming context that will execute this input stream
+ * @param addresses List of addresses at which SparkSinks are listening
+ * @param maxBatchSize Maximum size of a batch
+ * @param parallelism Number of parallel connections to open
+ * @param storageLevel The storage level to use.
+ * @tparam T Class type of the object of this stream
+ */
+private[streaming] class FlumePollingInputDStream[T: ClassTag](
+    @transient _ssc: StreamingContext,
+    val addresses: Seq[InetSocketAddress],
+    val maxBatchSize: Int,
+    val parallelism: Int,
+    storageLevel: StorageLevel
+  ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
+
+  override def getReceiver(): Receiver[SparkFlumeEvent] = {
+    new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
+  }
+}
+
+private[streaming] class FlumePollingReceiver(
+    addresses: Seq[InetSocketAddress],
+    maxBatchSize: Int,
+    parallelism: Int,
+    storageLevel: StorageLevel
+  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
+
+  lazy val channelFactoryExecutor =
+    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
+      setNameFormat("Flume Receiver Channel Thread - %d").build())
+
+  lazy val channelFactory =
+    new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+
+  lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
+
+  private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
+
+  override def onStart(): Unit = {
+    // Create the connections to each Flume agent.
+    addresses.foreach(host => {
+      val transceiver = new NettyTransceiver(host, channelFactory)
+      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+      connections.add(new FlumeConnection(transceiver, client))
+    })
+    for (i <- 0 until parallelism) {
+      logInfo("Starting Flume Polling Receiver worker threads starting..")
+      // Threads that pull data from Flume.
+      receiverExecutor.submit(new Runnable {
+        override def run(): Unit = {
+          while (true) {
+            val connection = connections.poll()
+            val client = connection.client
+            try {
+              val eventBatch = client.getEventBatch(maxBatchSize)
+              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
+                // No error, proceed with processing data
+                val seq = eventBatch.getSequenceNumber
+                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
+                logDebug(
+                  "Received batch of " + events.size() + " events with sequence number: " + seq)
+                try {
+                  // Convert each Flume event to a serializable SparkFlumeEvent
+                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
+                  var j = 0
+                  while (j < events.size()) {
+                    buffer += toSparkFlumeEvent(events(j))
+                    j += 1
+                  }
+                  store(buffer)
+                  logDebug("Sending ack for sequence number: " + seq)
+                  // Send an ack to Flume so that Flume discards the events from its channels.
+                  client.ack(seq)
+                  logDebug("Ack sent for sequence number: " + seq)
+                } catch {
+                  case e: Exception =>
+                    try {
+                      // Let Flume know that the events need to be pushed back into the channel.
+                      logDebug("Sending nack for sequence number: " + seq)
+                      client.nack(seq) // If the agent is down, even this could fail and throw
+                      logDebug("Nack sent for sequence number: " + seq)
+                    } catch {
+                      case e: Exception => logError(
+                        "Sending Nack also failed. A Flume agent is down.")
+                    }
+                    TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
+                    logWarning("Error while attempting to store events", e)
+                }
+              } else {
+                logWarning("Did not receive events from Flume agent due to error on the Flume " +
+                  "agent: " + eventBatch.getErrorMsg)
+              }
+            } catch {
+              case e: Exception =>
+                logWarning("Error while reading data from Flume", e)
+            } finally {
+              connections.add(connection)
+            }
+          }
+        }
+      })
+    }
+  }
+
+  override def onStop(): Unit = {
+    logInfo("Shutting down Flume Polling Receiver")
+    receiverExecutor.shutdownNow()
+    connections.foreach(connection => {
+      connection.transceiver.close()
+    })
+    channelFactory.releaseExternalResources()
+  }
+
+  /**
+   * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]]
+   * @param event - Event to convert to SparkFlumeEvent
+   * @return - The SparkFlumeEvent generated from SparkSinkEvent
+   */
+  private def toSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = {
+    val sparkFlumeEvent = new SparkFlumeEvent()
+    sparkFlumeEvent.event.setBody(event.getBody)
+    sparkFlumeEvent.event.setHeaders(event.getHeaders)
+    sparkFlumeEvent
+  }
+}
+
+/**
+ * A wrapper around the transceiver and the Avro IPC API. 
+ * @param transceiver The transceiver to use for communication with Flume
+ * @param client The client that the callbacks are received on.
+ */
+private class FlumeConnection(val transceiver: NettyTransceiver,
+  val client: SparkFlumeProtocol.Callback)
+
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 716db9f..4b732c1 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -17,12 +17,19 @@
 
 package org.apache.spark.streaming.flume
 
+import java.net.InetSocketAddress
+
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
 
 object FlumeUtils {
+  private val DEFAULT_POLLING_PARALLELISM = 5
+  private val DEFAULT_POLLING_BATCH_SIZE = 1000
+
   /**
    * Create a input stream from a Flume source.
    * @param ssc      StreamingContext object
@@ -56,7 +63,7 @@ object FlumeUtils {
     ): ReceiverInputDStream[SparkFlumeEvent] = {
     val inputStream = new FlumeInputDStream[SparkFlumeEvent](
         ssc, hostname, port, storageLevel, enableDecompression)
-        
+
     inputStream
   }
 
@@ -105,4 +112,135 @@ object FlumeUtils {
     ): JavaReceiverInputDStream[SparkFlumeEvent] = {
     createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
   }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+   * @param hostname Address of the host on which the Spark Sink is running
+   * @param port Port of the host at which the Spark Sink is listening
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      ssc: StreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+   * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      ssc: StreamingContext,
+      addresses: Seq[InetSocketAddress],
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(ssc, addresses, storageLevel,
+      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * @param addresses List of InetSocketAddresses representing the hosts to connect to.
+   * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
+   *                     single RPC call
+   * @param parallelism Number of concurrent requests this stream should send to the sink. Note
+   *                    that having a higher number of requests concurrently being pulled will
+   *                    result in this stream using more threads
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      ssc: StreamingContext,
+      addresses: Seq[InetSocketAddress],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
+    new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
+      parallelism, storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+   * @param hostname Hostname of the host on which the Spark Sink is running
+   * @param port     Port of the host at which the Spark Sink is listening
+   */
+  @Experimental
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+   * @param hostname     Hostname of the host on which the Spark Sink is running
+   * @param port         Port of the host at which the Spark Sink is listening
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      hostname: String,
+      port: Int,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
+   * @param addresses    List of InetSocketAddresses on which the Spark Sink is running.
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      addresses: Array[InetSocketAddress],
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc, addresses, storageLevel,
+      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
+  }
+
+  /**
+   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
+   * This stream will poll the sink for data and will pull events as they are available.
+   * @param addresses    List of InetSocketAddresses on which the Spark Sink is running
+   * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
+   *                     single RPC call
+   * @param parallelism  Number of concurrent requests this stream should send to the sink. Note
+   *                     that having a higher number of requests concurrently being pulled will
+   *                     result in this stream using more threads
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  @Experimental
+  def createPollingStream(
+      jssc: JavaStreamingContext,
+      addresses: Array[InetSocketAddress],
+      storageLevel: StorageLevel,
+      maxBatchSize: Int,
+      parallelism: Int
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+    createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
new file mode 100644
index 0000000..79c5b91
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import java.net.InetSocketAddress;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.junit.Test;
+
+public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testFlumeStream() {
+    // tests the API, does not actually test data receiving
+    InetSocketAddress[] addresses = new InetSocketAddress[] {
+        new InetSocketAddress("localhost", 12345)
+    };
+    JavaReceiverInputDStream<SparkFlumeEvent> test1 =
+        FlumeUtils.createPollingStream(ssc, "localhost", 12345);
+    JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
+        ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
+        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
+        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
new file mode 100644
index 0000000..47071d0
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+import org.apache.flume.event.EventBuilder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
+import org.apache.spark.streaming.flume.sink._
+
+class FlumePollingStreamSuite extends TestSuiteBase {
+
+  val testPort = 9999
+  val batchCount = 5
+  val eventsPerBatch = 100
+  val totalEventsPerChannel = batchCount * eventsPerBatch
+  val channelCapacity = 5000
+
+  test("flume polling test") {
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
+        StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+    ssc.start()
+
+    writeAndVerify(Seq(channel), ssc, outputBuffer)
+    assertChannelIsEmpty(channel)
+    sink.stop()
+    channel.stop()
+  }
+
+  test("flume polling test multiple hosts") {
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
+    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
+        eventsPerBatch, 5)
+    val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+      with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    outputStream.register()
+
+    // Start the channel and sink.
+    val context = new Context()
+    context.put("capacity", channelCapacity.toString)
+    context.put("transactionCapacity", "1000")
+    context.put("keep-alive", "0")
+    val channel = new MemoryChannel()
+    Configurables.configure(channel, context)
+
+    val channel2 = new MemoryChannel()
+    Configurables.configure(channel2, context)
+
+    val sink = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort))
+    Configurables.configure(sink, context)
+    sink.setChannel(channel)
+    sink.start()
+
+    val sink2 = new SparkSink()
+    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort + 1))
+    Configurables.configure(sink2, context)
+    sink2.setChannel(channel2)
+    sink2.start()
+    ssc.start()
+    writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+    assertChannelIsEmpty(channel)
+    assertChannelIsEmpty(channel2)
+    sink.stop()
+    channel.stop()
+  }
+
+  def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
+    outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) {
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    val executor = Executors.newCachedThreadPool()
+    val executorCompletion = new ExecutorCompletionService[Void](executor)
+    channels.map(channel => {
+      executorCompletion.submit(new TxnSubmitter(channel, clock))
+    })
+    for (i <- 0 until channels.size) {
+      executorCompletion.take()
+    }
+    val startTime = System.currentTimeMillis()
+    while (outputBuffer.size < batchCount * channels.size &&
+      System.currentTimeMillis() - startTime < 15000) {
+      logInfo("output.size = " + outputBuffer.size)
+      Thread.sleep(100)
+    }
+    val timeTaken = System.currentTimeMillis() - startTime
+    assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
+    logInfo("Stopping context")
+    ssc.stop()
+
+    val flattenedBuffer = outputBuffer.flatten
+    assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
+    var counter = 0
+    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+      val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
+        String.valueOf(i)).getBytes("utf-8"),
+        Map[String, String]("test-" + i.toString -> "header"))
+      var found = false
+      var j = 0
+      while (j < flattenedBuffer.size && !found) {
+        val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
+        if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
+          eventToVerify.getHeaders.get("test-" + i.toString)
+            .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
+          found = true
+          counter += 1
+        }
+        j += 1
+      }
+    }
+    assert(counter === totalEventsPerChannel * channels.size)
+  }
+
+  def assertChannelIsEmpty(channel: MemoryChannel) = {
+    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
+    queueRemaining.setAccessible(true)
+    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+    assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
+  }
+
+  private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
+    override def call(): Void = {
+      var t = 0
+      for (i <- 0 until batchCount) {
+        val tx = channel.getTransaction
+        tx.begin()
+        for (j <- 0 until eventsPerBatch) {
+          channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
+            "utf-8"),
+            Map[String, String]("test-" + t.toString -> "header")))
+          t += 1
+        }
+        tx.commit()
+        tx.close()
+        Thread.sleep(500) // Allow some time for the events to reach
+        clock.addToTime(batchDuration.milliseconds)
+      }
+      null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 93ef3b9..8b1435c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
     <module>external/twitter</module>
     <module>external/kafka</module>
     <module>external/flume</module>
+    <module>external/flume-sink</module>
     <module>external/zeromq</module>
     <module>external/mqtt</module>
     <module>examples</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1629bc2..0a6326e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -30,11 +30,12 @@ object BuildCommons {
 
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
-  val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, sql,
-  streaming, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
+  val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark,
+  sql, streaming, streamingFlumeSink, streamingFlume, streamingKafka, streamingMqtt,
+  streamingTwitter, streamingZeromq) =
     Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
-      "spark", "sql", "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt",
-      "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
+      "spark", "sql", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka",
+      "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
 
   val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) =
     Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl")
@@ -156,10 +157,9 @@ object SparkBuild extends PomBuild {
   /* Enable tests settings for all projects except examples, assembly and tools */
   (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
 
-  /* Enable Mima for all projects except spark, hive, catalyst, sql  and repl */
   // TODO: Add Sql to mima checks
-  allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl).contains(x)).
-    foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x))
+  allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
+    streamingFlumeSink).contains(x)).foreach(x => enable(MimaBuild.mimaSettings(sparkHome, x))(x))
 
   /* Enable Assembly for all assembly projects */
   assemblyProjects.foreach(enable(Assembly.settings))
@@ -173,6 +173,8 @@ object SparkBuild extends PomBuild {
   /* Hive console settings */
   enable(Hive.settings)(hive)
 
+  enable(Flume.settings)(streamingFlumeSink)
+
   // TODO: move this to its upstream project.
   override def projectDefinitions(baseDirectory: File): Seq[Project] = {
     super.projectDefinitions(baseDirectory).map { x =>
@@ -183,6 +185,10 @@ object SparkBuild extends PomBuild {
 
 }
 
+object Flume {
+  lazy val settings = sbtavro.SbtAvro.avroSettings
+}
+
 object SQL {
 
   lazy val settings = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/800ecff4/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/project/plugins.sbt b/project/plugins.sbt
index d3ac4bf..06d18e1 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -24,3 +24,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
 addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1")
 
 addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0")
+
+addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")


Mime
View raw message