flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject Re: user Digest 11 Jan 2018 11:24:06 -0000 Issue 2610
Date Thu, 11 Jan 2018 14:55:46 GMT
Hi Timo 
"You don't need to specify the type in .flatMap() explicitly. It will be automatically extracted
using the generic signature of DataDataConverter.”
It does not. That is the reason why I had to add it there

> Regarding your error. Make sure that you don't mix up the API classes. If you want to
use the Java API you should not use "org.apache.flink.streaming.api.scala.DataStream" but
the Java one.
I rewrote the class in Java. Thats why I am so confused



Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> 
> From: Timo Walther <twalthr@apache.org>
> Subject: Re: Java types
> Date: January 11, 2018 at 3:07:08 AM CST
> To: user@flink.apache.org
> 
> 
> Hi Boris,
> 
> each API is designed language-specific so they might not always be the same. Scala has
better type extraction features and let you write code very precisely. Java requires sometime
more code to archieve the same.
> 
> You don't need to specify the type in .flatMap() explicitly. It will be automatically
extracted using the generic signature of DataDataConverter.
> 
> Regarding your error. Make sure that you don't mix up the API classes. If you want to
use the Java API you should not use "org.apache.flink.streaming.api.scala.DataStream" but
the Java one.
> 
> Regards,
> Timo
> 
> 
> 
> Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
>> More questions
>> In Scala my DataProcessor is defined as
>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double]
with CheckpointedFunction {
>> And it is used as follows
>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>   .flatMap(BadDataHandler[ModelToServe])
>>   .keyBy(_.dataType)
>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>   .flatMap(BadDataHandler[WineRecord])
>>   .keyBy(_.dataType)
>> 
>> // Merge streams
>> data
>>   .connect(models)
>>   .process(DataProcessorKeyed())
>> When I am doing the same thing in Java
>> public class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord,
ModelToServe, Double> implements CheckpointedFunction{
>> Which I am using as follows
>> // Read data from streams
>> DataStream<Tuple2<String, ModelToServe>> models = modelsStream
>>         .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(ModelToServe.class)))
>>         .keyBy(0);
>> DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
>>         .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(Winerecord.WineRecord.class)))
>>         .keyBy(0);
>> 
>> // Merge streams
>> data
>>         .connect(models)
>>         .process(new DataProcessorKeyed());
>> I am getting an error
>> 
>> Error:(68, 17) java: no suitable method found for keyBy(int)
>>     method org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>)
is not applicable
>>       (argument mismatch; int cannot be converted to scala.collection.Seq<java.lang.Object>)
>>     method org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>)
is not applicable
>>       (cannot infer type-variable(s) K
>>         (actual and formal argument lists differ in length))
>> So it assumes key/value pairs for the coprocessor
>> 
>> Why is such difference between APIs?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <boris.lublinsky@lightbend.com
<mailto:boris.lublinsky@lightbend.com>> wrote:
>>> 
>>> I am trying to covert Scala code (which works fine) to Java
>>> The sacral code is:
>>> // create a Kafka consumers
>>> // Data
>>> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   DATA_TOPIC,
>>>   new ByteArraySchema,
>>>   dataKafkaProps
>>> )
>>> 
>>> // Model
>>> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   MODELS_TOPIC,
>>>   new ByteArraySchema,
>>>   modelKafkaProps
>>> )
>>> 
>>> // Create input data streams
>>> val modelsStream = env.addSource(modelConsumer)
>>> val dataStream = env.addSource(dataConsumer)
>>> 
>>> // Read data from streams
>>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>>   .flatMap(BadDataHandler[ModelToServe])
>>>   .keyBy(_.dataType)
>>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>>   .flatMap(BadDataHandler[WineRecord])
>>>   .keyBy(_.dataType)
>>> Now I am trying to re write it to Java and fighting with the requirement of providing
types, where they should be obvious
>>> 
>>> // create a Kafka consumers
>>> // Data
>>> FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
>>>         ModelServingConfiguration.DATA_TOPIC,
>>>         new ByteArraySchema(),
>>>         dataKafkaProps);
>>> 
>>> // Model
>>> FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
>>>         ModelServingConfiguration.MODELS_TOPIC,
>>>         new ByteArraySchema(),
>>>         modelKafkaProps);
>>> 
>>> // Create input data streams
>>> DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> // Read data from streams
>>> DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>>>      .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(ModelToServe.class)));
>>> 
>>> Am I missing something similar to import org.apache.flink.api.scala._
>>>  In java?
>>> 
>>> Now if this is an only way, Does this seems right?
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 


Mime
View raw message