spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Date Mon, 25 Aug 2014 22:37:09 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2065#discussion_r16685628
  
    --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
---
    @@ -90,53 +93,77 @@ private[streaming] class FlumePollingReceiver(
           // Threads that pull data from Flume.
           receiverExecutor.submit(new Runnable {
             override def run(): Unit = {
    -          while (true) {
    -            val connection = connections.poll()
    -            val client = connection.client
    -            try {
    -              val eventBatch = client.getEventBatch(maxBatchSize)
    -              if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    -                // No error, proceed with processing data
    -                val seq = eventBatch.getSequenceNumber
    -                val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    -                logDebug(
    -                  "Received batch of " + events.size() + " events with sequence number:
" + seq)
    -                try {
    -                  // Convert each Flume event to a serializable SparkFlumeEvent
    -                  val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    -                  var j = 0
    -                  while (j < events.size()) {
    -                    buffer += toSparkFlumeEvent(events(j))
    -                    j += 1
    +          val loop = new Breaks
    +          loop.breakable {
    +            while (!isStopped()) {
    +              val connection = connections.poll()
    +              val client = connection.client
    +              var batchReceived = false
    +              try {
    +                var eventBatch: EventBatch = null
    +                var seq: CharSequence = null
    +                Try {
    +                  eventBatch = client.getEventBatch(maxBatchSize)
    +                  batchReceived = true
    +                  if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
    +                    // No error, proceed with processing data
    +                    seq = eventBatch.getSequenceNumber
    +                    val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
    +                    logDebug(
    +                      "Received batch of " + events.size() + " events with sequence number:
" + seq)
    +                    // Convert each Flume event to a serializable SparkFlumeEvent
    +                    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
    +                    var j = 0
    +                    while (j < events.size()) {
    +                      buffer += toSparkFlumeEvent(events(j))
    +                      j += 1
    +                    }
    +                    store(buffer)
    +                    logDebug("Sending ack for sequence number: " + seq)
    +                    // Send an ack to Flume so that Flume discards the events from its
channels.
    +                    client.ack(seq)
    +                    logDebug("Ack sent for sequence number: " + seq)
    +                  } else {
    +                    batchReceived = false
    +                    logWarning(
    +                      "Did not receive events from Flume agent due to error on the Flume
" +
    +                        "agent: " + eventBatch.getErrorMsg)
                       }
    -                  store(buffer)
    -                  logDebug("Sending ack for sequence number: " + seq)
    -                  // Send an ack to Flume so that Flume discards the events from its
channels.
    -                  client.ack(seq)
    -                  logDebug("Ack sent for sequence number: " + seq)
    -                } catch {
    +                }.recoverWith {
                       case e: Exception =>
    -                    try {
    -                      // Let Flume know that the events need to be pushed back into the
channel.
    -                      logDebug("Sending nack for sequence number: " + seq)
    -                      client.nack(seq) // If the agent is down, even this could fail
and throw
    -                      logDebug("Nack sent for sequence number: " + seq)
    -                    } catch {
    -                      case e: Exception => logError(
    -                        "Sending Nack also failed. A Flume agent is down.")
    +                    Try {
    +                      Throwables.getRootCause(e) match {
    +                        // If the cause was an InterruptedException,
    +                        // then check if the receiver is stopped - if yes,
    +                        // just break out of the loop. Else send a Nack and
    +                        // log a warning.
    +                        // In the unlikely case, the cause was not an Exception,
    +                        // then just throw it out and exit.
    +                        case interrupted: InterruptedException =>
    +                          if (isStopped()) {
    +                            loop.break()
    +                          } else {
    +                            logWarning("Interrupted while receiving data from Flume",
interrupted)
    +                            sendNack(batchReceived, client, seq)
    +                          }
    +                        case exception: Exception =>
    +                          logWarning("Error while receiving data from Flume", exception)
    +                          sendNack(batchReceived, client, seq)
    +                        case majorError: Throwable =>
    --- End diff --
    
    Do not catch Throwables. Scala subsystem often uses throwables for variale control flow,
and catch throwable can have unpredictable consequences.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message