flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1649) Give a good error message when a user program emits a null record
Date Thu, 05 Mar 2015 10:33:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348547#comment-14348547
] 

ASF GitHub Bot commented on FLINK-1649:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/456#discussion_r25853770
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
---
    @@ -49,50 +48,33 @@
     	 * @param writers List of all writers.
     	 */
     	@SuppressWarnings("unchecked")
    -	public OutputCollector(List<RecordWriter<SerializationDelegate<T>>>
writers, TypeSerializer<T> serializer)
    -	{
    +	public OutputCollector(List<RecordWriter<SerializationDelegate<T>>>
writers, TypeSerializer<T> serializer) {
     		this.delegate = new SerializationDelegate<T>(serializer);
     		this.writers = (RecordWriter<SerializationDelegate<T>>[]) writers.toArray(new
RecordWriter[writers.size()]);
     	}
    -	
    -	/**
    -	 * Adds a writer to the OutputCollector.
    -	 * 
    -	 * @param writer The writer to add.
    -	 */
    -
    -	@SuppressWarnings("unchecked")
    -	public void addWriter(RecordWriter<SerializationDelegate<T>> writer)
    -	{
    -		// avoid using the array-list here to reduce one level of object indirection
    -		if (this.writers == null) {
    -			this.writers = new RecordWriter[] {writer};
    -		}
    -		else {
    -			RecordWriter<SerializationDelegate<T>>[] ws = new RecordWriter[this.writers.length
+ 1];
    -			System.arraycopy(this.writers, 0, ws, 0, this.writers.length);
    -			ws[this.writers.length] = writer;
    -			this.writers = ws;
    -		}
    -	}
     
     	/**
     	 * Collects a record and emits it to all writers.
     	 */
     	@Override
    -	public void collect(T record)
    -	{
    -		this.delegate.setInstance(record);
    -		try {
    -			for (int i = 0; i < writers.length; i++) {
    -				this.writers[i].emit(this.delegate);
    +	public void collect(T record)  {
    +		if (record != null) {
    +			this.delegate.setInstance(record);
    +			try {
    +				for (RecordWriter<SerializationDelegate<T>> writer : writers) {
    +					writer.emit(this.delegate);
    +				}
    +			}
    +			catch (IOException e) {
    +				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(),
e);
    +			}
    +			catch (InterruptedException e) {
    +				throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(),
e);
     			}
     		}
    -		catch (IOException e) {
    -			throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(),
e);
    -		}
    -		catch (InterruptedException e) {
    -			throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(),
e);
    +		else {
    +			throw new NullPointerException("The system does not support records that are null."
    +								+ "Null values are only supported as fields inside other objects.");
    --- End diff --
    
    missing whitespace after new sentence


> Give a good error message when a user program emits a null record
> -----------------------------------------------------------------
>
>                 Key: FLINK-1649
>                 URL: https://issues.apache.org/jira/browse/FLINK-1649
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.9
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message