flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1977) Rework Stream Operators to always be push based
Date Thu, 07 May 2015 14:08:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14532684#comment-14532684
] 

ASF GitHub Bot commented on FLINK-1977:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/659#discussion_r29854565
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
---
    @@ -1,149 +1,149 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.flink.streaming.connectors.flume;
    -
    -import java.util.List;
    -
    -import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.connectors.ConnectorSource;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.util.Collector;
    -import org.apache.flume.Context;
    -import org.apache.flume.channel.ChannelProcessor;
    -import org.apache.flume.source.AvroSource;
    -import org.apache.flume.source.avro.AvroFlumeEvent;
    -import org.apache.flume.source.avro.Status;
    -
    -public class FlumeSource<OUT> extends ConnectorSource<OUT> {
    -	private static final long serialVersionUID = 1L;
    -
    -	String host;
    -	String port;
    -	volatile boolean finished = false;
    -
    -	private volatile boolean isRunning = false;
    -
    -	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema)
{
    -		super(deserializationSchema);
    -		this.host = host;
    -		this.port = Integer.toString(port);
    -	}
    -
    -	public class MyAvroSource extends AvroSource {
    -		Collector<OUT> collector;
    -
    -		/**
    -		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
    -		 * {@link DataStream}.
    -		 * 
    -		 * @param avroEvent
    -		 *            The event that should be sent to the dataStream
    -		 * @return A {@link Status}.OK message if sending the event was
    -		 *         successful.
    -		 */
    -		@Override
    -		public Status append(AvroFlumeEvent avroEvent) {
    -			collect(avroEvent);
    -			return Status.OK;
    -		}
    -
    -		/**
    -		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
    -		 * {@link DataStream}.
    -		 * 
    -		 * @param events
    -		 *            The events that is sent to the dataStream
    -		 * @return A Status.OK message if sending the events was successful.
    -		 */
    -		@Override
    -		public Status appendBatch(List<AvroFlumeEvent> events) {
    -			for (AvroFlumeEvent avroEvent : events) {
    -				collect(avroEvent);
    -			}
    -
    -			return Status.OK;
    -		}
    -
    -		/**
    -		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
    -		 * {@link DataStream}.
    -		 * 
    -		 * @param avroEvent
    -		 *            The event that is sent to the dataStream
    -		 */
    -		private void collect(AvroFlumeEvent avroEvent) {
    -			byte[] b = avroEvent.getBody().array();
    -			OUT out = FlumeSource.this.schema.deserialize(b);
    -
    -			if (schema.isEndOfStream(out)) {
    -				FlumeSource.this.finished = true;
    -				this.stop();
    -				FlumeSource.this.notifyAll();
    -			} else {
    -				collector.collect(out);
    -			}
    -
    -		}
    -
    -	}
    -
    -	MyAvroSource avroSource;
    -
    -	/**
    -	 * Configures the AvroSource. Also sets the collector so the application can
    -	 * use it from outside of the invoke function.
    -	 * 
    -	 * @param collector
    -	 *            The collector used in the invoke function
    -	 */
    -	public void configureAvroSource(Collector<OUT> collector) {
    -
    -		avroSource = new MyAvroSource();
    -		avroSource.collector = collector;
    -		Context context = new Context();
    -		context.put("port", port);
    -		context.put("bind", host);
    -		avroSource.configure(context);
    -		// An instance of a ChannelProcessor is required for configuring the
    -		// avroSource although it will not be used in this case.
    -		ChannelProcessor cp = new ChannelProcessor(null);
    -		avroSource.setChannelProcessor(cp);
    -	}
    -
    -	/**
    -	 * Configures the AvroSource and runs until the user calls a close function.
    -	 * 
    -	 * @param collector
    -	 *            The Collector for sending data to the datastream
    -	 */
    -	@Override
    -	public void run(Collector<OUT> collector) throws Exception {
    -		isRunning = true;
    -		configureAvroSource(collector);
    -		avroSource.start();
    -		while (!finished && isRunning) {
    -			this.wait();
    -		}
    -	}
    -
    -	@Override
    -	public void cancel() {
    -		isRunning = false;
    -	}
    -
    -}
    +///*
    +// * Licensed to the Apache Software Foundation (ASF) under one or more
    +// * contributor license agreements.  See the NOTICE file distributed with
    +// * this work for additional information regarding copyright ownership.
    +// * The ASF licenses this file to You under the Apache License, Version 2.0
    +// * (the "License"); you may not use this file except in compliance with
    +// * the License.  You may obtain a copy of the License at
    +// *
    +// *    http://www.apache.org/licenses/LICENSE-2.0
    +// *
    +// * Unless required by applicable law or agreed to in writing, software
    +// * distributed under the License is distributed on an "AS IS" BASIS,
    +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +// * See the License for the specific language governing permissions and
    +// * limitations under the License.
    +// */
    +//
    +//package org.apache.flink.streaming.connectors.flume;
    +//
    +//import java.util.List;
    +//
    +//import org.apache.flink.streaming.api.datastream.DataStream;
    +//import org.apache.flink.streaming.connectors.ConnectorSource;
    +//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    +//import org.apache.flink.util.Collector;
    +//import org.apache.flume.Context;
    +//import org.apache.flume.channel.ChannelProcessor;
    +//import org.apache.flume.source.AvroSource;
    +//import org.apache.flume.source.avro.AvroFlumeEvent;
    +//import org.apache.flume.source.avro.Status;
    +//
    +//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
    +//	private static final long serialVersionUID = 1L;
    +//
    +//	String host;
    +//	String port;
    +//	volatile boolean finished = false;
    +//
    +//	private volatile boolean isRunning = false;
    +//
    +//	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema)
{
    +//		super(deserializationSchema);
    +//		this.host = host;
    +//		this.port = Integer.toString(port);
    +//	}
    +//
    +//	public class MyAvroSource extends AvroSource {
    +//		Collector<OUT> output;
    +//
    +//		/**
    +//		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
    +//		 * {@link DataStream}.
    +//		 *
    +//		 * @param avroEvent
    +//		 *            The event that should be sent to the dataStream
    +//		 * @return A {@link Status}.OK message if sending the event was
    +//		 *         successful.
    +//		 */
    +//		@Override
    +//		public Status append(AvroFlumeEvent avroEvent) {
    +//			collect(avroEvent);
    +//			return Status.OK;
    +//		}
    +//
    +//		/**
    +//		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
    +//		 * {@link DataStream}.
    +//		 *
    +//		 * @param events
    +//		 *            The events that is sent to the dataStream
    +//		 * @return A Status.OK message if sending the events was successful.
    +//		 */
    +//		@Override
    +//		public Status appendBatch(List<AvroFlumeEvent> events) {
    +//			for (AvroFlumeEvent avroEvent : events) {
    +//				collect(avroEvent);
    +//			}
    +//
    +//			return Status.OK;
    +//		}
    +//
    +//		/**
    +//		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
    +//		 * {@link DataStream}.
    +//		 *
    +//		 * @param avroEvent
    +//		 *            The event that is sent to the dataStream
    +//		 */
    +//		private void collect(AvroFlumeEvent avroEvent) {
    +//			byte[] b = avroEvent.getBody().array();
    +//			OUT out = FlumeSource.this.schema.deserialize(b);
    +//
    +//			if (schema.isEndOfStream(out)) {
    +//				FlumeSource.this.finished = true;
    +//				this.stop();
    +//				FlumeSource.this.notifyAll();
    +//			} else {
    +//				output.collect(out);
    --- End diff --
    
    No, this one is a bit harder since the Flume source has its own loop.


> Rework Stream Operators to always be push based
> -----------------------------------------------
>
>                 Key: FLINK-1977
>                 URL: https://issues.apache.org/jira/browse/FLINK-1977
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This is a result of the discussion on the mailing list. This is an excerpt from the mailing
list that gives the basic idea of the change:
> I propose to change all streaming operators to be push based, with a
> slightly improved interface: In addition to collect(), which I would
> call receiveElement() I would add receivePunctuation() and
> receiveBarrier(). The first operator in the chain would also get data
> from the outside invokable that reads from the input iterator and
> calls receiveElement() for the first operator in a chain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message