spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohammad Tariq <donta...@gmail.com>
Subject Re: Spark streaming readind avro from kafka
Date Wed, 01 Jun 2016 20:44:52 GMT
Hi Neeraj,

You might find Kafka-Direct
<http://spark.apache.org/docs/latest/streaming-kafka-integration.html> useful.
BTW, are you using something like Confluent for you Kafka setup. If that's
the case you might leverage Schema registry to get hold of the associated
schema without additional effort.

And for the DataFrame part you could do something like this :

JavaPairDStream<Object, Object> messages =
KafkaUtils.createDirectStream(jssc, Object.class, Object.class,
        KafkaAvroDecoder.class, KafkaAvroDecoder.class, kafkaParams,
topicsSet);
    JavaDStream<String> records = messages.map(new Function<Tuple2<Object,
Object>, String>() {
      @Override
      public String call(Tuple2<Object, Object> tuple2) {
        return tuple2._2().toString();
      }
    });
    records.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
        if (!rdd.isEmpty()) {
          SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
          DataFrame record = sqlContext.read().json(rdd);
          record.registerTempTable("dataAsTable");
          DataFrame dedupeFields = sqlContext.sql("SELECT name,
metadata.RBA, metadata.SHARD, "
            + "metadata.SCHEMA, metadata.`TABLE`, metadata.EVENTTYPE,
metadata.SOURCE_TS FROM dataAsTable");
        }
        return null;
      }
    });
    jssc.start();
    jssc.awaitTermination();
  }



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


On Thu, Jun 2, 2016 at 1:59 AM, Igor Berman <igor.berman@gmail.com> wrote:

> Avro file contains metadata with schema(writer schema)
> in Kafka there is no such thing, you should put message that will contain
> some reference to known schema(put whole schema will have big overhead)
> some people use schema registry solution
>
> On 1 June 2016 at 21:02, justneeraj <justneeraj@gmail.com> wrote:
>
>> +1
>>
>> I am trying to read avro from kafka and I don't want to limit to a small
>> set
>> of schema. So I want to dynamically load the schema from avro file (as
>> avro
>> contains schema as well). And then from this I want to create a dataframe
>> and run some queries on that.
>>
>> Any help would be really thankful.
>>
>> Thanks,
>> Neeraj
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-readind-avro-from-kafka-tp22425p27067.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message