flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmed Nader <ahmednader...@gmail.com>
Subject Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException
Date Wed, 08 Jun 2016 15:48:48 GMT
Hello Flavio,
Thank you so much for replying, however I didn't download Flink locally, I
only added dependencies in a maven project. So i don't think I'll be able
to modify the KryoSerializer class. But yeah me too i think it's the
problem.
Thanks,
Ahmed

On 8 June 2016 at 16:07, Flavio Pompermaier <pompermaier@okkam.it> wrote:

> Hi Ahmed,
> I also have the same error that is probably caused by the KryoSerializer.
> Right now I'm testing a patch to this problem so maybe you could also test
> it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use
> my KryoSerializer but I think so. Actually I just recreate Input and Output
> every time in the serialized/deserialize and then I close them.
>
> This is my attempt to fix the problem (actually the KryoSerializer class
> in the flink-core module):
>
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.flink.api.java.typeutils.runtime.kryo;
>
> import com.esotericsoftware.kryo.Kryo;
> import com.esotericsoftware.kryo.KryoException;
> import com.esotericsoftware.kryo.Serializer;
> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
> import com.esotericsoftware.kryo.io.Input;
> import com.esotericsoftware.kryo.io.Output;
> import com.esotericsoftware.kryo.serializers.JavaSerializer;
> import com.google.common.base.Preconditions;
>
> import org.apache.avro.generic.GenericData;
> import org.apache.flink.api.common.ExecutionConfig;
> import org.apache.flink.api.common.typeutils.TypeSerializer;
> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
> import
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
> import org.apache.flink.core.memory.DataInputView;
> import org.apache.flink.core.memory.DataOutputView;
> import org.objenesis.strategy.StdInstantiatorStrategy;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.EOFException;
> import java.io.IOException;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import java.lang.reflect.Modifier;
> import java.util.LinkedHashMap;
> import java.util.LinkedHashSet;
> import java.util.Map;
> import java.util.Objects;
>
> /**
>  * A type serializer that serializes its type using the Kryo serialization
>  * framework (https://github.com/EsotericSoftware/kryo).
>  *
>  * This serializer is intended as a fallback serializer for the cases that
> are
>  * not covered by the basic types, tuples, and POJOs.
>  *
>  * @param <T> The type to be serialized.
>  */
> public class KryoSerializer<T> extends TypeSerializer<T> {
>
> private static final long serialVersionUID = 3L;
>
> private static final Logger LOG =
> LoggerFactory.getLogger(KryoSerializer.class);
>
> // ------------------------------------------------------------------------
>
> private final LinkedHashMap<Class<?>,
> ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
> registeredTypesWithSerializerClasses;
> private final LinkedHashMap<Class<?>,
> ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
> defaultSerializerClasses;
> private final LinkedHashSet<Class<?>> registeredTypes;
>
> private final Class<T> type;
> // ------------------------------------------------------------------------
> // The fields below are lazily initialized after duplication or
> deserialization.
>
> private transient Kryo kryo;
> private transient T copyInstance;
> // ------------------------------------------------------------------------
>
> public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
> this.type = Preconditions.checkNotNull(type);
>
> this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
> this.defaultSerializerClasses =
> executionConfig.getDefaultKryoSerializerClasses();
> this.registeredTypesWithSerializers =
> executionConfig.getRegisteredTypesWithKryoSerializers();
> this.registeredTypesWithSerializerClasses =
> executionConfig.getRegisteredTypesWithKryoSerializerClasses();
> this.registeredTypes = executionConfig.getRegisteredKryoTypes();
> }
>
> /**
> * Copy-constructor that does not copy transient fields. They will be
> initialized once required.
> */
> protected KryoSerializer(KryoSerializer<T> toCopy) {
> registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
> registeredTypesWithSerializerClasses =
> toCopy.registeredTypesWithSerializerClasses;
> defaultSerializers = toCopy.defaultSerializers;
> defaultSerializerClasses = toCopy.defaultSerializerClasses;
> registeredTypes = toCopy.registeredTypes;
>
> type = toCopy.type;
> if(type == null){
> throw new NullPointerException("Type class cannot be null.");
> }
> }
>
> // ------------------------------------------------------------------------
>
> @Override
> public boolean isImmutableType() {
> return false;
> }
>
> @Override
> public KryoSerializer<T> duplicate() {
> return new KryoSerializer<T>(this);
> }
>
> @Override
> public T createInstance() {
> if(Modifier.isAbstract(type.getModifiers()) ||
> Modifier.isInterface(type.getModifiers()) ) {
> return null;
> } else {
> checkKryoInitialized();
> try {
> return kryo.newInstance(type);
> } catch(Throwable e) {
> return null;
> }
> }
> }
>
> @SuppressWarnings("unchecked")
> @Override
> public T copy(T from) {
> if (from == null) {
> return null;
> }
> checkKryoInitialized();
> try {
> return kryo.copy(from);
> }
> catch(KryoException ke) {
> // kryo was unable to copy it, so we do it through serialization:
> ByteArrayOutputStream baout = new ByteArrayOutputStream();
> Output output = new Output(baout);
>
> kryo.writeObject(output, from);
>
> output.close();
>
> ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
> Input input = new Input(bain);
>
> return (T)kryo.readObject(input, from.getClass());
> }
> }
> @Override
> public T copy(T from, T reuse) {
> return copy(from);
> }
>
> @Override
> public int getLength() {
> return -1;
> }
>
> @Override
> public void serialize(T record, DataOutputView target) throws IOException {
> checkKryoInitialized();
>   DataOutputViewStream outputStream = new DataOutputViewStream(target);
>   Output output = new Output(outputStream);
>
> try {
>    // Sanity check: Make sure that the output is cleared/has been flushed
> by the last call
>        // otherwise data might be written multiple times in case of a
> previous EOFException
>        if (output.position() != 0) {
>            throw new IllegalStateException("The Kryo Output still contains
> data from a previous " +
>                "serialize call. It has to be flushed or cleared at the end
> of the serialize call.");
>        }
>
> kryo.writeClassAndObject(output, record);
> output.flush();
> }
> catch (KryoException ke) {
> Throwable cause = ke.getCause();
> if (cause instanceof EOFException) {
> throw (EOFException) cause;
> }
> else {
> throw ke;
> }
> } finally {
>      try {
>               output.close();
>             } catch (KryoException ke) {
>               Throwable cause = ke.getCause();
>
>               if (cause instanceof EOFException) {
>                   throw (EOFException) cause;
>               } else {
>                   throw ke;
>               }
>           }
> }
> }
>
> @SuppressWarnings("unchecked")
> @Override
> public T deserialize(DataInputView source) throws IOException {
> checkKryoInitialized();
> DataInputViewStream inputStream = new DataInputViewStream(source);
> Input input = new NoFetchingInput(inputStream);
>
> try {
> return (T) kryo.readClassAndObject(input);
> } catch (KryoException ke) {
> Throwable cause = ke.getCause();
>
> if (cause instanceof EOFException) {
> throw (EOFException) cause;
> } else {
> throw ke;
> }
> } finally {
>  try {
>    input.close();
>  } catch (KryoException ke) {
>             Throwable cause = ke.getCause();
>
>             if (cause instanceof EOFException) {
>                 throw (EOFException) cause;
>             } else {
>                 throw ke;
>             }
>           }
> }
> }
> @Override
> public T deserialize(T reuse, DataInputView source) throws IOException {
> return deserialize(source);
> }
>
> @Override
> public void copy(DataInputView source, DataOutputView target) throws
> IOException {
> checkKryoInitialized();
> if(this.copyInstance == null){
> this.copyInstance = createInstance();
> }
>
> T tmp = deserialize(copyInstance, source);
> serialize(tmp, target);
> }
> //
> --------------------------------------------------------------------------------------------
> @Override
> public int hashCode() {
> return Objects.hash(
> type,
> registeredTypes,
> registeredTypesWithSerializerClasses,
> defaultSerializerClasses);
> }
> @Override
> public boolean equals(Object obj) {
> if (obj instanceof KryoSerializer) {
> KryoSerializer<?> other = (KryoSerializer<?>) obj;
>
> // we cannot include the Serializers here because they don't implement the
> equals method
> return other.canEqual(this) &&
> type == other.type &&
> registeredTypes.equals(other.registeredTypes) &&
> registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
> &&
> defaultSerializerClasses.equals(other.defaultSerializerClasses);
> } else {
> return false;
> }
> }
>
> @Override
> public boolean canEqual(Object obj) {
> return obj instanceof KryoSerializer;
> }
>
> //
> --------------------------------------------------------------------------------------------
>
> /**
> * Returns the Chill Kryo Serializer which is implictly added to the
> classpath via flink-runtime.
> * Falls back to the default Kryo serializer if it can't be found.
> * @return The Kryo serializer instance.
> */
> private Kryo getKryoInstance() {
>
> try {
> // check if ScalaKryoInstantiator is in class path (coming from Twitter's
> Chill library).
> // This will be true if Flink's Scala API is used.
> Class<?> chillInstantiatorClazz =
> Class.forName("com.twitter.chill.ScalaKryoInstantiator");
> Object chillInstantiator = chillInstantiatorClazz.newInstance();
>
> // obtain a Kryo instance through Twitter Chill
> Method m = chillInstantiatorClazz.getMethod("newKryo");
>
> return (Kryo) m.invoke(chillInstantiator);
> } catch (ClassNotFoundException | InstantiationException |
> NoSuchMethodException |
> IllegalAccessException | InvocationTargetException e) {
>
> LOG.warn("Falling back to default Kryo serializer because Chill serializer
> couldn't be found.", e);
>
> Kryo.DefaultInstantiatorStrategy initStrategy = new
> Kryo.DefaultInstantiatorStrategy();
> initStrategy.setFallbackInstantiatorStrategy(new
> StdInstantiatorStrategy());
>
> Kryo kryo = new Kryo();
> kryo.setInstantiatorStrategy(initStrategy);
>
> return kryo;
> }
> }
>
> private void checkKryoInitialized() {
> if (this.kryo == null) {
> this.kryo = getKryoInstance();
>
> // Enable reference tracking.
> kryo.setReferences(true);
> // Throwable and all subclasses should be serialized via java serialization
> kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
>
> // Add default serializers first, so that they type registrations without
> a serializer
> // are registered with a default serializer
> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
entry:
> defaultSerializers.entrySet()) {
> kryo.addDefaultSerializer(entry.getKey(),
> entry.getValue().getSerializer());
> }
>
> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
> defaultSerializerClasses.entrySet()) {
> kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
> }
>
> // register the type of our class
> kryo.register(type);
>
> // register given types. we do this first so that any registration of a
> // more specific serializer overrides this
> for (Class<?> type : registeredTypes) {
> kryo.register(type);
> }
>
> // register given serializer classes
> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
> registeredTypesWithSerializerClasses.entrySet()) {
> Class<?> typeClass = e.getKey();
> Class<? extends Serializer<?>> serializerClass = e.getValue();
>
> Serializer<?> serializer =
> ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
> typeClass);
> kryo.register(typeClass, serializer);
> }
>
> // register given serializers
> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
e :
> registeredTypesWithSerializers.entrySet()) {
> kryo.register(e.getKey(), e.getValue().getSerializer());
> }
> // this is needed for Avro but can not be added on demand.
> kryo.register(GenericData.Array.class, new
> SpecificInstanceCollectionSerializerForArrayList());
>
> kryo.setRegistrationRequired(false);
> kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
> }
> }
>
> //
> --------------------------------------------------------------------------------------------
> // For testing
> //
> --------------------------------------------------------------------------------------------
> public Kryo getKryo() {
> checkKryoInitialized();
> return this.kryo;
> }
> }
>
> Best,
> Flavio
>
> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader839@gmail.com>
> wrote:
>
>> Hello,
>> I have a TwitterSource and I'm applying some transformations as filter
>> and map on the resulting stream from twitter. I'm collecting the output in
>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>> parallel thread i periodically check if this iterator.hasNext() and print
>> the next item. I'm using Flink 1.0.3.
>> That program works at the beginning and actually prints some items,
>> however when i leave it running for some more time (Like for example after
>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>> These 2 exceptions result from the line where i'm checking if the
>> iterator hasNext().
>>
>> I wanted to know why do these exceptions happen in general and also if
>> anyone knows a specific solution for my program, that would be great too.
>> Thanks,
>> Ahmed
>>
>
>
>
> --
>
> Flavio Pompermaier
>
> *Development Department*_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Phone:* +(39) 0461 283 702
> *Fax:* + (39) 0461 186 6433
> *Email:* pompermaier@okkam.it
> *Headquarters:* Trento (Italy), via G.B. Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
>

Mime
View raw message