spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sundaram, Muthu X." <Muthu.X.Sundaram....@sabre.com>
Subject RE: Tranforming flume events using Spark transformation functions
Date Tue, 22 Jul 2014 20:55:23 GMT
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not
at all executed. I might be doing this in a wrong way. Any help would be appreciated.

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception
{
                                System.out.println("<<<<<<Inside for each...call>>>>");

                                JavaRDD<String> records = eventsData.map(
            new Function<SparkFlumeEvent, String>() {
                                @Override
                public String call(SparkFlumeEvent flume) throws Exception {
                    String logRecord = null;
                AvroFlumeEvent avroEvent = null;
      ByteBuffer bytePayload = null;

                                                                                System.out.println("<<<<<<Inside
Map..call>>>>");
                    /* List<SparkFlumeEvent> events = flume.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); 
                                                                                
                            SparkFlumeEvent flumeEvent = batchedEvents.next();*/
                            avroEvent = flume.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());                 
              
                                                                                         
                      System.out.println("<<<<Record is" + logRecord);
                                                                                
                    return logRecord;
                }
            });                                               
                                return null;
}

-----Original Message-----
From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; dev@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

Each event is a delimited record. I like to use some of the transformation functions like
map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String>
or can I apply these function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket.

I have to do transformations like this. Right now I am able to receives records. But I am
struggling to transform them using spark transformation functions since they are not of type
JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception
{
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD
= " + logRecord); }

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create
this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu



Mime
View raw message