flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs
Date Sun, 16 Aug 2015 15:16:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698718#comment-14698718
] 

ASF GitHub Bot commented on FLINK-2462:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1017#discussion_r37146808
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
---
    @@ -27,114 +26,65 @@
     import org.apache.flink.streaming.api.graph.StreamEdge;
     import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
     import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
     
     public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1,
IN2, OUT>> {
     
    -	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
    -
     	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
    +	
    +	private volatile boolean running = true;
     
     	@Override
    -	public void registerInputOutput() {
    -		try {
    -			super.registerInputOutput();
    +	public void init() throws Exception {
    +		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
    +		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
     	
    -			TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
    -			TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
    +		int numberOfInputs = configuration.getNumberOfInputs();
     	
    -			int numberOfInputs = configuration.getNumberOfInputs();
    +		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
    +		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
     	
    -			ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
    -			ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
    +		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
     	
    -			List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
    -	
    -			for (int i = 0; i < numberOfInputs; i++) {
    -				int inputType = inEdges.get(i).getTypeNumber();
    -				InputGate reader = getEnvironment().getInputGate(i);
    -				switch (inputType) {
    -					case 1:
    -						inputList1.add(reader);
    -						break;
    -					case 2:
    -						inputList2.add(reader);
    -						break;
    -					default:
    -						throw new RuntimeException("Invalid input type number: " + inputType);
    -				}
    +		for (int i = 0; i < numberOfInputs; i++) {
    +			int inputType = inEdges.get(i).getTypeNumber();
    +			InputGate reader = getEnvironment().getInputGate(i);
    +			switch (inputType) {
    +				case 1:
    +					inputList1.add(reader);
    +					break;
    +				case 2:
    --- End diff --
    
    This was actually part of the original code - I did not modify it as part of this pull
request.
    As far as I see it, the `StreamEdge` code is part of the API, not the runtime. It may
be adjusted as part of  #988 


> Wrong exception reporting in streaming jobs
> -------------------------------------------
>
>                 Key: FLINK-2462
>                 URL: https://issues.apache.org/jira/browse/FLINK-2462
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 0.10
>
>
> When streaming tasks are fail and are canceled, they report a plethora of followup exceptions.
> The batch operators have a clear model that makes sure that root causes are reported,
and followup exceptions are not reported. That makes debugging much easier.
> A big part of that is to have a single consistent place that logs exceptions, and that
has a view of whether the operation is still running, or whether it has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message