beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-2831) Possible bug in Beam+Flink memory management, disk spillover
Date Thu, 31 Aug 2017 13:42:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148985#comment-16148985
] 

Aljoscha Krettek edited comment on BEAM-2831 at 8/31/17 1:41 PM:
-----------------------------------------------------------------

Could you try running it with this modified {{SerializableCoder}}:
{code}
public class SerializableCoder<T extends Serializable> extends CustomCoder<T>
{

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T>
type) {
    @SuppressWarnings("unchecked")
    Class<T> clazz = (Class<T>) type.getRawType();
    return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element class.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(Class<T>
clazz) {
    return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible
for
   * all types.
   *
   * <p>This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
    return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar
{

    @Override
    public List<CoderProvider> getCoderProviders() {
      return ImmutableList.of(getCoderProvider());
    }
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
    @Override
    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException
{
      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
      }
      throw new CannotProvideCoderException(
          "Cannot provide SerializableCoder because " + typeDescriptor
              + " does not implement Serializable");
    }
  }

  private final Class<T> type;
  private transient TypeDescriptor<T> typeDescriptor;

  protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor)
{
    this.type = type;
    this.typeDescriptor = typeDescriptor;
  }

  public Class<T> getRecordType() {
    return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
      throws IOException {
    ObjectOutputStream oos = new ObjectOutputStream(outStream);
    oos.writeObject(value);
    oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
      throws IOException, CoderException {
    try {
      ObjectInputStream ois = new ObjectInputStream(inStream);
      return type.cast(ois.readObject());
    } catch (ClassNotFoundException e) {
      throw new CoderException("unable to deserialize record", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   *         deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    throw new NonDeterministicException(this,
        "Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
    return !(other == null || getClass() != other.getClass())
            && type == ((SerializableCoder<?>) other).type;
  }

  @Override
  public int hashCode() {
    return type.hashCode();
  }

  @Override
  public TypeDescriptor<T> getEncodedTypeDescriptor() {
    if (typeDescriptor == null) {
      typeDescriptor = TypeDescriptor.of(type);
    }
    return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think
this should fix the problem and if it indeed does we should include this change in Beam.


was (Author: aljoscha):
Could you try running it with this modified {{SerializableCoder}}:
{coder}
public class SerializableCoder<T extends Serializable> extends CustomCoder<T>
{

  /**
   * Returns a {@link SerializableCoder} instance for the provided element type.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T>
type) {
    @SuppressWarnings("unchecked")
    Class<T> clazz = (Class<T>) type.getRawType();
    return new SerializableCoder<>(clazz, type);
  }

  /**
   * Returns a {@link SerializableCoder} instance for the provided element class.
   * @param <T> the element type
   */
  public static <T extends Serializable> SerializableCoder<T> of(Class<T>
clazz) {
    return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
  }

  /**
   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible
for
   * all types.
   *
   * <p>This method is invoked reflectively from {@link DefaultCoder}.
   */
  @SuppressWarnings("unused")
  public static CoderProvider getCoderProvider() {
    return new SerializableCoderProvider();
  }

  /**
   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
   * serializable types.
   */
  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar
{

    @Override
    public List<CoderProvider> getCoderProviders() {
      return ImmutableList.of(getCoderProvider());
    }
  }

  /**
   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
   * implements serializable.
   */
  static class SerializableCoderProvider extends CoderProvider {
    @Override
    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException
{
      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
      }
      throw new CannotProvideCoderException(
          "Cannot provide SerializableCoder because " + typeDescriptor
              + " does not implement Serializable");
    }
  }

  private final Class<T> type;
  private transient TypeDescriptor<T> typeDescriptor;

  protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor)
{
    this.type = type;
    this.typeDescriptor = typeDescriptor;
  }

  public Class<T> getRecordType() {
    return type;
  }

  @Override
  public void encode(T value, OutputStream outStream)
      throws IOException {
    ObjectOutputStream oos = new ObjectOutputStream(outStream);
    oos.writeObject(value);
    oos.flush();
  }

  @Override
  public T decode(InputStream inStream)
      throws IOException, CoderException {
    try {
      ObjectInputStream ois = new ObjectInputStream(inStream);
      return type.cast(ois.readObject());
    } catch (ClassNotFoundException e) {
      throw new CoderException("unable to deserialize record", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @throws NonDeterministicException always. Java serialization is not
   *         deterministic with respect to {@link Object#equals} for all types.
   */
  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    throw new NonDeterministicException(this,
        "Java Serialization may be non-deterministic.");
  }

  @Override
  public boolean equals(Object other) {
    return !(other == null || getClass() != other.getClass())
            && type == ((SerializableCoder<?>) other).type;
  }

  @Override
  public int hashCode() {
    return type.hashCode();
  }

  @Override
  public TypeDescriptor<T> getEncodedTypeDescriptor() {
    if (typeDescriptor == null) {
      typeDescriptor = TypeDescriptor.of(type);
    }
    return typeDescriptor;
  }

  // This coder inherits isRegisterByteSizeObserverCheap,
  // getEncodedElementByteSize and registerByteSizeObserver
  // from StructuredCoder. Looks like we cannot do much better
  // in this case.
}
{code}

The only change is in {{encode()}} where we don't wrap the {{EOFException}} anymore. I think
this should fix the problem and if it indeed does we should include this change in Beam.

> Possible bug in Beam+Flink memory management, disk spillover
> ------------------------------------------------------------
>
>                 Key: BEAM-2831
>                 URL: https://issues.apache.org/jira/browse/BEAM-2831
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>         Environment: Flink 1.2.1 and 1.3.0, Java HotSpot and OpenJDK 8, macOS 10.12.6
and unknown Linux
>            Reporter: Reinier Kip
>            Assignee: Aljoscha Krettek
>
> I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap
memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which
causes the job to fail.
> As [discussed on Flink's mailinglist|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-related-to-memory-segments-during-run-of-Beam-pipeline-on-Flink-td15255.html]
(stacktrace enclosed), Flink catches these EOFExceptions and activates disk spillover. Because
Beam wraps these exceptions, this mechanism fails, the exception travels up the stack, and
the job aborts.
> Hopefully this is enough information and this is something that can be adjusted for in
Beam. I'd be glad to provide more information where needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message