flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Saiph Kappa <saiph.ka...@gmail.com>
Subject Re: Flink Stream: collect in an array all records within a window
Date Tue, 19 Jan 2016 15:27:39 GMT
I think this is a bug in the scala API.

def writeToSocket(hostname : scala.Predef.String, port :
java.lang.Integer, schema :
org.apache.flink.streaming.util.serialization.SerializationSchema[T,
scala.Array[scala.Byte]]) :
org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /*
compiled code */ }



On Tue, Jan 19, 2016 at 2:16 PM, Matthias J. Sax <mjsax@apache.org> wrote:

> It should work.
>
> Your error message indicates, that your DataStream is of type
> [String,Array[Byte]] and not of type [String].
>
> > Type mismatch, expected: SerializationSchema[String, Array[Byte]],
> actual: SimpleStringSchema
>
> Can you maybe share your code?
>
> -Matthias
>
> On 01/19/2016 01:57 PM, Saiph Kappa wrote:
> > It's DataStream[String]. So it seems that SimpleStringSchema cannot be
> > used in writeToSocket regardless of the type of the DataStream. Right?
> >
> > On Tue, Jan 19, 2016 at 1:32 PM, Matthias J. Sax <mjsax@apache.org
> > <mailto:mjsax@apache.org>> wrote:
> >
> >     What type is your DataStream? It must be DataStream[String] to work
> with
> >     SimpleStringSchema.
> >
> >     If you have a different type, just implement a customized
> >     SerializationSchema.
> >
> >     -Matthias
> >
> >
> >     On 01/19/2016 11:26 AM, Saiph Kappa wrote:
> >     > When I use SimpleStringSchema I get the error: Type mismatch,
> expected:
> >     > SerializationSchema[String, Array[Byte]], actual:
> SimpleStringSchema. I
> >     > think SimpleStringSchema extends SerializationSchema[String], and
> >     > therefore cannot be used as argument of writeToSocket. Can you
> confirm
> >     > this please?
> >     >
> >     > s.writeToSocket(host, port.toInt, new SimpleStringSchema())
> >     >
> >     >
> >     > Thanks.
> >     >
> >     > On Tue, Jan 19, 2016 at 10:34 AM, Matthias J. Sax <
> mjsax@apache.org <mailto:mjsax@apache.org>
> >     > <mailto:mjsax@apache.org <mailto:mjsax@apache.org>>> wrote:
> >     >
> >     >     There is SimpleStringSchema.
> >     >
> >     >     -Matthias
> >     >
> >     >     On 01/18/2016 11:21 PM, Saiph Kappa wrote:
> >     >     > Hi Matthias,
> >     >     >
> >     >     > Thanks for your response. The method .writeToSocket seems to
> be what I
> >     >     > was looking for. Can you tell me what kind of serialization
> schema
> >     >     > should I use assuming my socket server receives strings. I
> have
> >     >     > something like this in scala:
> >     >     >
> >     >     > |val server =newServerSocket(9999)while(true){val s
> =server.accept()val
> >     >     >
> in=newBufferedSource(s.getInputStream()).getLines()println(in.next())s.close()}
> >     >     >
> >     >     > |
> >     >     >
> >     >     > Thanks|
> >     >     > |
> >     >     >
> >     >     >
> >     >     >
> >     >     > On Mon, Jan 18, 2016 at 8:46 PM, Matthias J. Sax <
> mjsax@apache.org <mailto:mjsax@apache.org> <mailto:mjsax@apache.org
> >     <mailto:mjsax@apache.org>>
> >     >     > <mailto:mjsax@apache.org <mailto:mjsax@apache.org>
> >     <mailto:mjsax@apache.org <mailto:mjsax@apache.org>>>> wrote:
> >     >     >
> >     >     >     Hi Saiph,
> >     >     >
> >     >     >     you can use AllWindowFunction via .apply(...) to get an
> >     >     .collect method:
> >     >     >
> >     >     >     From:
> >     >     >
> >     >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
> >     >     >
> >     >     >     > // applying an AllWindowFunction on non-keyed window
> >     stream
> >     >     >     > allWindowedStream.apply (new
> >     >     >     AllWindowFunction<Tuple2<String,Integer>, Integer,
> >     Window>() {
> >     >     >     >     public void apply (Window window,
> >     >     >     >             Iterable<Tuple2<String, Integer>>
values,
> >     >     >     >             Collector<Integer> out) throws Exception
{
> >     >     >     >         int sum = 0;
> >     >     >     >         for (value t: values) {
> >     >     >     >             sum += t.f1;
> >     >     >     >         }
> >     >     >     >         out.collect (new Integer(sum));
> >     >     >     >     }
> >     >     >     > });
> >     >     >
> >     >     >     If you consume all those value via an sink, the sink
> >     will run
> >     >     an the
> >     >     >     cluster. You can use .writeToSocket(...) as sink:
> >     >     >
> >     >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#data-sinks
> >     >     >
> >     >     >     -Matthias
> >     >     >
> >     >     >
> >     >     >     On 01/18/2016 06:30 PM, Saiph Kappa wrote:
> >     >     >     > Hi,
> >     >     >     >
> >     >     >     > After performing a windowAll() on a
> DataStream[String], is
> >     >     there any
> >     >     >     > method to collect and return an array with all Strings
> >     >     within a window
> >     >     >     > (similar to .collect in Spark).
> >     >     >     >
> >     >     >     > I basically want to ship all strings in a window to a
> >     remote
> >     >     server
> >     >     >     > through a socket, and want to use the same socket
> >     connection
> >     >     for all
> >     >     >     > strings that I send. The method .addSink iterates over
> all
> >     >     >     records, but
> >     >     >     > does the provided function runs on the flink client or
> on
> >     >     the server?
> >     >     >     >
> >     >     >     > Thanks.
> >     >     >
> >     >     >
> >     >
> >     >
> >
> >
>
>

Mime
View raw message