flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3701) Cant call execute after first execution
Date Tue, 10 May 2016 14:41:13 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278197#comment-15278197
] 

ASF GitHub Bot commented on FLINK-3701:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1913#discussion_r62683914
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java
---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.util;
    +
    +
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +/**
    + * A data structure that enables to keep a value which may be serialized/deserialized
using a custom
    + * Classloader. The encapsulated value is kept locally for further use after a copy of
the class has
    + * been serialized and shipped somewhere else.
    + *
    + * This is useful if we want to keep working with a value which may require a custom
classloader in
    + * one context but is fine with the system/context Classloader in other cases. The value
is cached
    + * as long as possible to prevent unnecessary serialization/deserialization.
    + */
    +public class SerializableCacheableValue<T> implements Serializable {
    +
    +	/** The current cached value. Lost when serialized. */
    +	private transient T value;
    +
    +	private SerializedValue<T> serializedValue;
    +
    +	public SerializableCacheableValue(T value) {
    +		update(value);
    +	}
    +
    +	/**
    +	 * Custom serialization methods which always writes the latest value.
    +     */
    +	private void writeObject(ObjectOutputStream out) throws IOException {
    +		// trigger serialization once more to update to the least recent value
    +		serialize();
    +		out.defaultWriteObject();
    +	}
    +
    +	/**
    +	 * Serialization, e.g. before shipping the class
    +	 */
    +	public void serialize() throws IOException {
    +		if (value != null) {
    +			serializedValue = new SerializedValue<>(value);
    +		}
    +	}
    +
    +	/**
    +	 * Explicit deserialiation using a provided class loader.
    +	 * @param classLoader The class loader to use
    +	 */
    +	public void deserialize(ClassLoader classLoader) {
    +		if (serializedValue != null) {
    +			try {
    +				value = serializedValue.deserializeValue(classLoader);
    +			} catch (Exception e) {
    +				throw new RuntimeException("Attempted to deserialize serialized data " +
    +					" with class loader " + classLoader + " failed. You probably forgot to" +
    +					" deserialize using a custom Classloader via deserialize(Classloader).", e);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Gets the current value or deserializes it uses the current Classloader.
    +	 * @return the value of type T
    +     */
    +	public T get() {
    +		if (value == null) {
    +			deserialize(getClass().getClassLoader());
    +		}
    +		return value;
    +	}
    +
    +	/**
    +	 * Updates the current stored value.
    +	 * @param value The new value of type T
    +     */
    +	public void update(T value) {
    +		Preconditions.checkNotNull(value, "Serializable value must not be null.");
    +		this.value = value;
    +	}
    +
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		SerializableCacheableValue<?> that = (SerializableCacheableValue<?>) o;
    +
    +		try {
    +			// serialize for equality check
    +			serialize();
    +		} catch (IOException e) {
    +			throw new RuntimeException("Error while serializing for equality check.");
    +		}
    +
    +		return serializedValue != null && serializedValue.equals(that.serializedValue);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		int result = value != null ? value.hashCode() : 0;
    +		result = 31 * result + serializedValue.hashCode();
    --- End diff --
    
    This is a potential null pointer, serialization did not happen, yet.


> Cant call execute after first execution
> ---------------------------------------
>
>                 Key: FLINK-3701
>                 URL: https://issues.apache.org/jira/browse/FLINK-3701
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala Shell
>    Affects Versions: 1.1.0
>            Reporter: Nikolaas Steenbergen
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
> 	at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
> 	at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> 	at org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
> 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> 	at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> 	at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> 	at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
> 	at .<init>(<console>:56)
> 	at .<clinit>(<console>)
> 	at .<init>(<console>:7)
> 	at .<clinit>(<console>)
> 	at $print(<console>)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> 	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> 	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> 	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> 	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> 	at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> 	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> 	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> 	at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> 	at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> 	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> 	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> 	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> 	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> 	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> 	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> 	at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
> 	at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
> 	at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message