Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D3B7200C70 for ; Wed, 19 Apr 2017 21:14:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0BE64160BB3; Wed, 19 Apr 2017 19:14:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E2DD4160BBB for ; Wed, 19 Apr 2017 21:14:41 +0200 (CEST) Received: (qmail 85083 invoked by uid 500); 19 Apr 2017 19:14:41 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 81874 invoked by uid 99); 19 Apr 2017 19:14:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 19:14:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C995AE04F2; Wed, 19 Apr 2017 19:14:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Wed, 19 Apr 2017 19:15:10 -0000 Message-Id: In-Reply-To: <7b84ba86181a4307aa8a5e75b864f763@git.apache.org> References: <7b84ba86181a4307aa8a5e75b864f763@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples package archived-at: Wed, 19 Apr 2017 19:14:45 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java deleted file mode 100644 index af4b354..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java deleted file mode 100644 index 9b449aa..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -/** - * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link org.apache.beam.sdk.coders.Coder}s. - */ -public class CoderTypeInformation extends TypeInformation implements AtomicType { - - private final Coder coder; - - public CoderTypeInformation(Coder coder) { - checkNotNull(coder); - this.coder = coder; - } - - public Coder getCoder() { - return coder; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - @SuppressWarnings("unchecked") - public Class getTypeClass() { - // We don't have the Class, so we have to pass null here. What a shame... - return (Class) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer createSerializer(ExecutionConfig config) { - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - CoderTypeInformation that = (CoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeInformation; - } - - @Override - public String toString() { - return "CoderTypeInformation{coder=" + coder + '}'; - } - - @Override - public TypeComparator createComparator(boolean sortOrderAscending, ExecutionConfig - executionConfig) { - throw new UnsupportedOperationException( - "Non-encoded values cannot be compared directly."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java deleted file mode 100644 index e210ed9..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.EOFException; -import java.io.IOException; -import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; -import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}. - */ -public class CoderTypeSerializer extends TypeSerializer { - - private Coder coder; - - public CoderTypeSerializer(Coder coder) { - this.coder = coder; - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CoderTypeSerializer duplicate() { - return new CoderTypeSerializer<>(coder); - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T t) { - try { - return CoderUtils.clone(coder, t); - } catch (CoderException e) { - throw new RuntimeException("Could not clone.", e); - } - } - - @Override - public T copy(T t, T reuse) { - return copy(t); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T t, DataOutputView dataOutputView) throws IOException { - DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); - } - - @Override - public T deserialize(DataInputView dataInputView) throws IOException { - try { - DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); - } catch (CoderException e) { - Throwable cause = e.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw e; - } - } - } - - @Override - public T deserialize(T t, DataInputView dataInputView) throws IOException { - return deserialize(dataInputView); - } - - @Override - public void copy( - DataInputView dataInputView, - DataOutputView dataOutputView) throws IOException { - serialize(deserialize(dataInputView), dataOutputView); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - CoderTypeSerializer that = (CoderTypeSerializer) o; - return coder.equals(that.coder); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeSerializer; - } - - @Override - public int hashCode() { - return coder.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java deleted file mode 100644 index 667ef45..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.IOException; -import java.util.Arrays; -import org.apache.beam.sdk.coders.Coder; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; - -/** - * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have - * been encoded to byte data by a {@link Coder}. - */ -public class EncodedValueComparator extends TypeComparator { - - /** For storing the Reference in encoded form. */ - private transient byte[] encodedReferenceKey; - - private final boolean ascending; - - public EncodedValueComparator(boolean ascending) { - this.ascending = ascending; - } - - @Override - public int hash(byte[] record) { - return Arrays.hashCode(record); - } - - @Override - public void setReference(byte[] toCompare) { - this.encodedReferenceKey = toCompare; - } - - @Override - public boolean equalToReference(byte[] candidate) { - if (encodedReferenceKey.length != candidate.length) { - return false; - } - int len = candidate.length; - for (int i = 0; i < len; i++) { - if (encodedReferenceKey[i] != candidate[i]) { - return false; - } - } - return true; - } - - @Override - public int compareToReference(TypeComparator other) { - // VERY IMPORTANT: compareToReference does not behave like Comparable.compare - // the meaning of the return value is inverted. - - EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other; - - int len = Math.min( - encodedReferenceKey.length, - otherEncodedValueComparator.encodedReferenceKey.length); - - for (int i = 0; i < len; i++) { - byte b1 = encodedReferenceKey[i]; - byte b2 = otherEncodedValueComparator.encodedReferenceKey[i]; - int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); - if (result != 0) { - return ascending ? -result : result; - } - } - int result = - encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length; - return ascending ? -result : result; - } - - - @Override - public int compare(byte[] first, byte[] second) { - int len = Math.min(first.length, second.length); - for (int i = 0; i < len; i++) { - byte b1 = first[i]; - byte b2 = second[i]; - int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); - if (result != 0) { - return ascending ? result : -result; - } - } - int result = first.length - second.length; - return ascending ? result : -result; - } - - @Override - public int compareSerialized( - DataInputView firstSource, - DataInputView secondSource) throws IOException { - int lengthFirst = firstSource.readInt(); - int lengthSecond = secondSource.readInt(); - - int len = Math.min(lengthFirst, lengthSecond); - for (int i = 0; i < len; i++) { - byte b1 = firstSource.readByte(); - byte b2 = secondSource.readByte(); - int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); - if (result != 0) { - return ascending ? result : -result; - } - } - - int result = lengthFirst - lengthSecond; - return ascending ? result : -result; - } - - - - @Override - public boolean supportsNormalizedKey() { - // disabled because this seems to not work with some coders, - // such as the AvroCoder - return false; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) { - final int limit = offset + numBytes; - - target.put(offset, record, 0, Math.min(numBytes, record.length)); - - offset += record.length; - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return !ascending; - } - - @Override - public TypeComparator duplicate() { - return new EncodedValueComparator(ascending); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] { this.duplicate() }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java deleted file mode 100644 index 41db61e..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.IOException; - -import org.apache.beam.sdk.coders.Coder; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -/** - * {@link TypeSerializer} for values that were encoded using a {@link Coder}. - */ -public final class EncodedValueSerializer extends TypeSerializer { - - private static final long serialVersionUID = 1L; - - private static final byte[] EMPTY = new byte[0]; - - @Override - public boolean isImmutableType() { - return true; - } - - @Override - public byte[] createInstance() { - return EMPTY; - } - - @Override - public byte[] copy(byte[] from) { - return from; - } - - @Override - public byte[] copy(byte[] from, byte[] reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; - } - - - @Override - public void serialize(byte[] record, DataOutputView target) throws IOException { - if (record == null) { - throw new IllegalArgumentException("The record must not be null."); - } - - final int len = record.length; - target.writeInt(len); - target.write(record); - } - - @Override - public byte[] deserialize(DataInputView source) throws IOException { - final int len = source.readInt(); - byte[] result = new byte[len]; - source.readFully(result); - return result; - } - - @Override - public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - final int len = source.readInt(); - target.writeInt(len); - target.write(source, len); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EncodedValueSerializer; - } - - @Override - public int hashCode() { - return this.getClass().hashCode(); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof EncodedValueSerializer; - } - - @Override - public TypeSerializer duplicate() { - return this; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java deleted file mode 100644 index e24bf31..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.AtomicType; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -/** - * Flink {@link TypeInformation} for Beam values that have been encoded to byte data - * by a {@link Coder}. - */ -public class EncodedValueTypeInformation - extends TypeInformation - implements AtomicType { - - private static final long serialVersionUID = 1L; - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 0; - } - - @Override - public int getTotalFields() { - return 0; - } - - @Override - public Class getTypeClass() { - return byte[].class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - public TypeSerializer createSerializer(ExecutionConfig executionConfig) { - return new EncodedValueSerializer(); - } - - @Override - public boolean equals(Object other) { - return other instanceof EncodedValueTypeInformation; - } - - @Override - public int hashCode() { - return this.getClass().hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EncodedValueTypeInformation; - } - - @Override - public String toString() { - return "EncodedValueTypeInformation"; - } - - @Override - public TypeComparator createComparator( - boolean sortOrderAscending, - ExecutionConfig executionConfig) { - return new EncodedValueComparator(sortOrderAscending); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java deleted file mode 100644 index 36b5ba3..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.ByteArrayOutputStream; - -/** - * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal - * byte[] buffer without incurring an array copy. - */ -public class InspectableByteArrayOutputStream extends ByteArrayOutputStream { - - /** - * Get the underlying byte array. - */ - public byte[] getBuffer() { - return buf; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java deleted file mode 100644 index 9df6836..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - -/** - * {@link KeySelector} that extracts the key from a {@link KV} and returns - * it in encoded form as a {@code byte} array. - */ -public class KvKeySelector - implements KeySelector>, byte[]>, ResultTypeQueryable { - - private final Coder keyCoder; - - public KvKeySelector(Coder keyCoder) { - this.keyCoder = keyCoder; - } - - @Override - public byte[] getKey(WindowedValue> value) throws Exception { - return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey()); - } - - @Override - public TypeInformation getProducedType() { - return new EncodedValueTypeInformation(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java deleted file mode 100644 index 6fb3182..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.types; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java deleted file mode 100644 index 2256bb1..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink.translation.utils; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.IOChannelUtils; - -/** - * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. - */ -public class SerializedPipelineOptions implements Serializable { - - private final byte[] serializedOptions; - - /** Lazily initialized copy of deserialized options. */ - private transient PipelineOptions pipelineOptions; - - public SerializedPipelineOptions(PipelineOptions options) { - checkNotNull(options, "PipelineOptions must not be null."); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - new ObjectMapper().writeValue(baos, options); - this.serializedOptions = baos.toByteArray(); - } catch (Exception e) { - throw new RuntimeException("Couldn't serialize PipelineOptions.", e); - } - - } - - public PipelineOptions getPipelineOptions() { - if (pipelineOptions == null) { - try { - pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); - - IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); - FileSystems.setDefaultConfigInWorkers(pipelineOptions); - } catch (IOException e) { - throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); - } - } - - return pipelineOptions; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java deleted file mode 100644 index 5dedd53..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.utils; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java deleted file mode 100644 index 82a2c4e..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import org.apache.flink.core.memory.DataInputView; - -/** - * Wrapper for {@link DataInputView}. We need this because Flink reads data using a - * {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an - * {@link java.io.InputStream}. - */ -public class DataInputViewWrapper extends InputStream { - - private DataInputView inputView; - - public DataInputViewWrapper(DataInputView inputView) { - this.inputView = inputView; - } - - public void setInputView(DataInputView inputView) { - this.inputView = inputView; - } - - @Override - public int read() throws IOException { - try { - return inputView.readUnsignedByte(); - } catch (EOFException e) { - // translate between DataInput and InputStream, - // DataInput signals EOF by exception, InputStream does it by returning -1 - return -1; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java deleted file mode 100644 index f2d9db2..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import java.io.IOException; -import java.io.OutputStream; -import org.apache.flink.core.memory.DataOutputView; - -/** - * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because - * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an - * {@link java.io.OutputStream}. - */ -public class DataOutputViewWrapper extends OutputStream { - - private DataOutputView outputView; - - public DataOutputViewWrapper(DataOutputView outputView) { - this.outputView = outputView; - } - - public void setOutputView(DataOutputView outputView) { - this.outputView = outputView; - } - - @Override - public void write(int b) throws IOException { - outputView.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java deleted file mode 100644 index 70d97e3..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.Serializable; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.flink.api.common.accumulators.Accumulator; - -/** - * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} - * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} - * operation. - */ -public class SerializableFnAggregatorWrapper - implements Aggregator, Accumulator { - - private OutputT aa; - private Combine.CombineFn combiner; - - public SerializableFnAggregatorWrapper(Combine.CombineFn combiner) { - this.combiner = combiner; - resetLocal(); - } - - @Override - @SuppressWarnings("unchecked") - public void add(InputT value) { - this.aa = combiner.apply(ImmutableList.of((InputT) aa, value)); - } - - @Override - public Serializable getLocalValue() { - return (Serializable) aa; - } - - @Override - public void resetLocal() { - this.aa = combiner.apply(ImmutableList.of()); - } - - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator other) { - this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue())); - } - - @Override - public void addValue(InputT value) { - add(value); - } - - @Override - public String getName() { - return "Aggregator :" + combiner.toString(); - } - - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } - - @Override - public Accumulator clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - // Flink Accumulators cannot throw CloneNotSupportedException, work around that. - throw new RuntimeException(e); - } - - // copy it by merging - OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa)); - SerializableFnAggregatorWrapper result = new - SerializableFnAggregatorWrapper<>(combiner); - - result.aa = resultCopy; - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java deleted file mode 100644 index a87472b..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import java.io.IOException; -import java.util.List; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. - */ -public class SourceInputFormat - implements InputFormat, SourceInputSplit> { - private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); - - private final BoundedSource initialSource; - - private transient PipelineOptions options; - private final SerializedPipelineOptions serializedOptions; - - private transient BoundedSource.BoundedReader reader; - private boolean inputAvailable = false; - - public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { - this.initialSource = initialSource; - this.serializedOptions = new SerializedPipelineOptions(options); - } - - @Override - public void configure(Configuration configuration) { - options = serializedOptions.getPipelineOptions(); - } - - @Override - public void open(SourceInputSplit sourceInputSplit) throws IOException { - reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); - inputAvailable = reader.start(); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { - try { - final long estimatedSize = initialSource.getEstimatedSizeBytes(options); - - return new BaseStatistics() { - @Override - public long getTotalInputSize() { - return estimatedSize; - } - - @Override - public long getNumberOfRecords() { - return BaseStatistics.NUM_RECORDS_UNKNOWN; - } - - @Override - public float getAverageRecordWidth() { - return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; - } - }; - } catch (Exception e) { - LOG.warn("Could not read Source statistics: {}", e); - } - - return null; - } - - @Override - @SuppressWarnings("unchecked") - public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { - try { - long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List> shards = - initialSource.split(desiredSizeBytes, options); - int numShards = shards.size(); - SourceInputSplit[] sourceInputSplits = new SourceInputSplit[numShards]; - for (int i = 0; i < numShards; i++) { - sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i); - } - return sourceInputSplits; - } catch (Exception e) { - throw new IOException("Could not create input splits from Source.", e); - } - } - - @Override - public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { - return new DefaultInputSplitAssigner(sourceInputSplits); - } - - - @Override - public boolean reachedEnd() throws IOException { - return !inputAvailable; - } - - @Override - public WindowedValue nextRecord(WindowedValue t) throws IOException { - if (inputAvailable) { - final T current = reader.getCurrent(); - final Instant timestamp = reader.getCurrentTimestamp(); - // advance reader to have a record ready next time - inputAvailable = reader.advance(); - return WindowedValue.of( - current, - timestamp, - GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - } - - return null; - } - - @Override - public void close() throws IOException { - // TODO null check can be removed once FLINK-3796 is fixed - if (reader != null) { - reader.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java deleted file mode 100644 index e4a7386..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import org.apache.beam.sdk.io.Source; -import org.apache.flink.core.io.InputSplit; - -/** - * {@link org.apache.flink.core.io.InputSplit} for - * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass - * the sharded Source around in the input split because Sources simply split up into several - * Sources for sharding. This is different to how Flink creates a separate InputSplit from - * an InputFormat. - */ -public class SourceInputSplit implements InputSplit { - - private Source source; - private int splitNumber; - - public SourceInputSplit() { - } - - public SourceInputSplit(Source source, int splitNumber) { - this.source = source; - this.splitNumber = splitNumber; - } - - @Override - public int getSplitNumber() { - return splitNumber; - } - - public Source getSource() { - return source; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java deleted file mode 100644 index 72f7deb..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.wrappers; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java deleted file mode 100644 index 8a09286..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ /dev/null @@ -1,774 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import static org.apache.flink.util.Preconditions.checkArgument; - -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; -import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.SideInputHandler; -import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.StatefulDoFnRunner; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; -import org.apache.flink.runtime.state.KeyGroupsList; -import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.HeapInternalTimerService; -import org.apache.flink.streaming.api.operators.InternalTimer; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.Triggerable; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.joda.time.Instant; - -/** - * Flink operator for executing {@link DoFn DoFns}. - * - * @param the input type of the {@link DoFn} - * @param the output type of the {@link DoFn} - * @param the output type of the operator, this can be different from the fn output - * type when we have additional tagged outputs - */ -public class DoFnOperator - extends AbstractStreamOperator - implements OneInputStreamOperator, OutputT>, - TwoInputStreamOperator, RawUnionValue, OutputT>, - KeyGroupCheckpointedOperator, Triggerable { - - protected DoFn doFn; - - protected final SerializedPipelineOptions serializedOptions; - - protected final TupleTag mainOutputTag; - protected final List> additionalOutputTags; - - protected final Collection> sideInputs; - protected final Map> sideInputTagMapping; - - protected final WindowingStrategy windowingStrategy; - - protected final OutputManagerFactory outputManagerFactory; - - protected transient DoFnRunner doFnRunner; - protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; - - protected transient SideInputHandler sideInputHandler; - - protected transient SideInputReader sideInputReader; - - protected transient DoFnRunners.OutputManager outputManager; - - private transient DoFnInvoker doFnInvoker; - - protected transient long currentInputWatermark; - - protected transient long currentOutputWatermark; - - private transient StateTag>> pushedBackTag; - - protected transient FlinkStateInternals stateInternals; - - private Coder> inputCoder; - - private final Coder keyCoder; - - private final TimerInternals.TimerDataCoder timerCoder; - - protected transient HeapInternalTimerService timerService; - - protected transient FlinkTimerInternals timerInternals; - - private transient StateInternals pushbackStateInternals; - - private transient Optional pushedBackWatermark; - - public DoFnOperator( - DoFn doFn, - Coder> inputCoder, - TupleTag mainOutputTag, - List> additionalOutputTags, - OutputManagerFactory outputManagerFactory, - WindowingStrategy windowingStrategy, - Map> sideInputTagMapping, - Collection> sideInputs, - PipelineOptions options, - Coder keyCoder) { - this.doFn = doFn; - this.inputCoder = inputCoder; - this.mainOutputTag = mainOutputTag; - this.additionalOutputTags = additionalOutputTags; - this.sideInputTagMapping = sideInputTagMapping; - this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(options); - this.windowingStrategy = windowingStrategy; - this.outputManagerFactory = outputManagerFactory; - - setChainingStrategy(ChainingStrategy.ALWAYS); - - this.keyCoder = keyCoder; - - this.timerCoder = - TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); - } - - private ExecutionContext.StepContext createStepContext() { - return new StepContext(); - } - - // allow overriding this in WindowDoFnOperator because this one dynamically creates - // the DoFn - protected DoFn getDoFn() { - return doFn; - } - - @Override - public void open() throws Exception { - super.open(); - - currentInputWatermark = Long.MIN_VALUE; - currentOutputWatermark = Long.MIN_VALUE; - - AggregatorFactory aggregatorFactory = new AggregatorFactory() { - @Override - public Aggregator createAggregatorForDoFn( - Class fnClass, - ExecutionContext.StepContext stepContext, - String aggregatorName, - Combine.CombineFn combine) { - - @SuppressWarnings("unchecked") - SerializableFnAggregatorWrapper result = - (SerializableFnAggregatorWrapper) - getRuntimeContext().getAccumulator(aggregatorName); - - if (result == null) { - result = new SerializableFnAggregatorWrapper<>(combine); - getRuntimeContext().addAccumulator(aggregatorName, result); - } - return result; - } - }; - - sideInputReader = NullSideInputReader.of(sideInputs); - - if (!sideInputs.isEmpty()) { - - pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); - - FlinkBroadcastStateInternals sideInputStateInternals = - new FlinkBroadcastStateInternals<>( - getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()); - - sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); - sideInputReader = sideInputHandler; - - // maybe init by initializeState - if (pushbackStateInternals == null) { - if (keyCoder != null) { - pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, - getKeyedStateBackend()); - } else { - pushbackStateInternals = - new FlinkSplitStateInternals(getOperatorStateBackend()); - } - } - - pushedBackWatermark = Optional.absent(); - - } - - outputManager = outputManagerFactory.create(output); - - // StatefulPardo or WindowDoFn - if (keyCoder != null) { - stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), - keyCoder); - - timerService = (HeapInternalTimerService) - getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this); - - timerInternals = new FlinkTimerInternals(); - - } - - // WindowDoFnOperator need use state and timer to get DoFn. - // So must wait StateInternals and TimerInternals ready. - this.doFn = getDoFn(); - doFnInvoker = DoFnInvokers.invokerFor(doFn); - - doFnInvoker.invokeSetup(); - - ExecutionContext.StepContext stepContext = createStepContext(); - - doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), - doFn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - - if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { - // When the doFn is this, we know it came from WindowDoFnOperator and - // InputT = KeyedWorkItem - // OutputT = KV - // - // for some K, V - - - doFnRunner = DoFnRunners.lateDataDroppingRunner( - (DoFnRunner) doFnRunner, - stepContext, - windowingStrategy, - ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); - } else if (keyCoder != null) { - // It is a stateful DoFn - - StatefulDoFnRunner.CleanupTimer cleanupTimer = - new StatefulDoFnRunner.TimeInternalsCleanupTimer( - stepContext.timerInternals(), windowingStrategy); - - // we don't know the window type - @SuppressWarnings({"unchecked", "rawtypes"}) - Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - StatefulDoFnRunner.StateCleaner stateCleaner = - new StatefulDoFnRunner.StateInternalsStateCleaner<>( - doFn, stepContext.stateInternals(), windowCoder); - - doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( - doFn, - doFnRunner, - stepContext, - aggregatorFactory, - windowingStrategy, - cleanupTimer, - stateCleaner); - } - - pushbackDoFnRunner = - SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - } - - @Override - public void close() throws Exception { - super.close(); - doFnInvoker.invokeTeardown(); - } - - protected final long getPushbackWatermarkHold() { - // if we don't have side inputs we never hold the watermark - if (sideInputs.isEmpty()) { - return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - } - - try { - checkInitPushedBackWatermark(); - return pushedBackWatermark.get(); - } catch (Exception e) { - throw new RuntimeException("Error retrieving pushed back watermark state.", e); - } - } - - private void checkInitPushedBackWatermark() { - // init and restore from pushedBack state. - // Not done in initializeState, because OperatorState is not ready. - if (!pushedBackWatermark.isPresent()) { - - BagState> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - for (WindowedValue value : pushedBack.read()) { - min = Math.min(min, value.getTimestamp().getMillis()); - } - setPushedBackWatermark(min); - } - } - - @Override - public final void processElement( - StreamRecord> streamRecord) throws Exception { - doFnRunner.startBundle(); - doFnRunner.processElement(streamRecord.getValue()); - doFnRunner.finishBundle(); - } - - private void setPushedBackWatermark(long watermark) { - pushedBackWatermark = Optional.fromNullable(watermark); - } - - @Override - public final void processElement1( - StreamRecord> streamRecord) throws Exception { - pushbackDoFnRunner.startBundle(); - Iterable> justPushedBack = - pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue()); - - BagState> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - checkInitPushedBackWatermark(); - - long min = pushedBackWatermark.get(); - for (WindowedValue pushedBackValue : justPushedBack) { - min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); - pushedBack.add(pushedBackValue); - } - setPushedBackWatermark(min); - pushbackDoFnRunner.finishBundle(); - } - - @Override - public final void processElement2( - StreamRecord streamRecord) throws Exception { - pushbackDoFnRunner.startBundle(); - - @SuppressWarnings("unchecked") - WindowedValue> value = - (WindowedValue>) streamRecord.getValue().getValue(); - - PCollectionView sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); - sideInputHandler.addSideInputValue(sideInput, value); - - BagState> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - - List> newPushedBack = new ArrayList<>(); - - Iterable> pushedBackContents = pushedBack.read(); - if (pushedBackContents != null) { - for (WindowedValue elem : pushedBackContents) { - - // we need to set the correct key in case the operator is - // a (keyed) window operator - setKeyContextElement1(new StreamRecord<>(elem)); - - Iterable> justPushedBack = - pushbackDoFnRunner.processElementInReadyWindows(elem); - Iterables.addAll(newPushedBack, justPushedBack); - } - } - - pushedBack.clear(); - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); - for (WindowedValue pushedBackValue : newPushedBack) { - min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); - pushedBack.add(pushedBackValue); - } - setPushedBackWatermark(min); - - pushbackDoFnRunner.finishBundle(); - - // maybe output a new watermark - processWatermark1(new Watermark(currentInputWatermark)); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - processWatermark1(mark); - } - - @Override - public void processWatermark1(Watermark mark) throws Exception { - if (keyCoder == null) { - this.currentInputWatermark = mark.getTimestamp(); - long potentialOutputWatermark = - Math.min(getPushbackWatermarkHold(), currentInputWatermark); - if (potentialOutputWatermark > currentOutputWatermark) { - currentOutputWatermark = potentialOutputWatermark; - output.emitWatermark(new Watermark(currentOutputWatermark)); - } - } else { - // fireTimers, so we need startBundle. - pushbackDoFnRunner.startBundle(); - - this.currentInputWatermark = mark.getTimestamp(); - - // hold back by the pushed back values waiting for side inputs - long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - - timerService.advanceWatermark(actualInputWatermark); - - Instant watermarkHold = stateInternals.watermarkHold(); - - long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); - - long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); - - if (potentialOutputWatermark > currentOutputWatermark) { - currentOutputWatermark = potentialOutputWatermark; - output.emitWatermark(new Watermark(currentOutputWatermark)); - } - pushbackDoFnRunner.finishBundle(); - } - } - - @Override - public void processWatermark2(Watermark mark) throws Exception { - // ignore watermarks from the side-input input - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - // copy from AbstractStreamOperator - if (getKeyedStateBackend() != null) { - KeyedStateCheckpointOutputStream out; - - try { - out = context.getRawKeyedOperatorStateOutput(); - } catch (Exception exception) { - throw new Exception("Could not open raw keyed operator state stream for " - + getOperatorName() + '.', exception); - } - - try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); - - // if (this instanceof KeyGroupCheckpointedOperator) - snapshotKeyGroupState(keyGroupIdx, dov); - - // We can't get all timerServices, so we just snapshot our timerService - // Maybe this is a normal DoFn that has no timerService - if (keyCoder != null) { - timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx); - } - - } - } catch (Exception exception) { - throw new Exception("Could not write timer service of " + getOperatorName() - + " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " - + "might have prevented deleting some state data.", getOperatorName(), - closeException); - } - } - } - } - - @Override - public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception { - if (!sideInputs.isEmpty() && keyCoder != null) { - ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState( - keyGroupIndex, out); - } - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - if (getKeyedStateBackend() != null) { - int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups(); - KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange(); - - for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { - DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream()); - - int keyGroupIdx = streamProvider.getKeyGroupId(); - checkArgument(localKeyGroupRange.contains(keyGroupIdx), - "Key Group " + keyGroupIdx + " does not belong to the local range."); - - // if (this instanceof KeyGroupRestoringOperator) - restoreKeyGroupState(keyGroupIdx, div); - - // We just initialize our timerService - if (keyCoder != null) { - if (timerService == null) { - timerService = new HeapInternalTimerService<>( - totalKeyGroups, - localKeyGroupRange, - this, - getRuntimeContext().getProcessingTimeService()); - } - timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader()); - } - } - } - } - - @Override - public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception { - if (!sideInputs.isEmpty() && keyCoder != null) { - if (pushbackStateInternals == null) { - pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, - getKeyedStateBackend()); - } - ((FlinkKeyGroupStateInternals) pushbackStateInternals) - .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader()); - } - } - - @Override - public void onEventTime(InternalTimer timer) throws Exception { - fireTimer(timer); - } - - @Override - public void onProcessingTime(InternalTimer timer) throws Exception { - fireTimer(timer); - } - - // allow overriding this in WindowDoFnOperator - public void fireTimer(InternalTimer timer) { - TimerInternals.TimerData timerData = timer.getNamespace(); - StateNamespace namespace = timerData.getNamespace(); - // This is a user timer, so namespace must be WindowNamespace - checkArgument(namespace instanceof WindowNamespace); - BoundedWindow window = ((WindowNamespace) namespace).getWindow(); - pushbackDoFnRunner.onTimer(timerData.getTimerId(), window, - timerData.getTimestamp(), timerData.getDomain()); - } - - /** - * Factory for creating an {@link DoFnRunners.OutputManager} from - * a Flink {@link Output}. - */ - interface OutputManagerFactory extends Serializable { - DoFnRunners.OutputManager create(Output> output); - } - - /** - * Default implementation of {@link OutputManagerFactory} that creates an - * {@link DoFnRunners.OutputManager} that only writes to - * a single logical output. - */ - public static class DefaultOutputManagerFactory - implements OutputManagerFactory { - @Override - public DoFnRunners.OutputManager create(final Output> output) { - return new DoFnRunners.OutputManager() { - @Override - public void output(TupleTag tag, WindowedValue value) { - // with tagged outputs we can't get around this because we don't - // know our own output type... - @SuppressWarnings("unchecked") - OutputT castValue = (OutputT) value; - output.collect(new StreamRecord<>(castValue)); - } - }; - } - } - - /** - * Implementation of {@link OutputManagerFactory} that creates an - * {@link DoFnRunners.OutputManager} that can write to multiple logical - * outputs by unioning them in a {@link RawUnionValue}. - */ - public static class MultiOutputOutputManagerFactory - implements OutputManagerFactory { - - Map, Integer> mapping; - - public MultiOutputOutputManagerFactory(Map, Integer> mapping) { - this.mapping = mapping; - } - - @Override - public DoFnRunners.OutputManager create(final Output> output) { - return new DoFnRunners.OutputManager() { - @Override - public void output(TupleTag tag, WindowedValue value) { - int intTag = mapping.get(tag); - output.collect(new StreamRecord<>(new RawUnionValue(intTag, value))); - } - }; - } - } - - /** - * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow - * accessing state or timer internals. - */ - protected class StepContext implements ExecutionContext.StepContext { - - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - - @Override - public void noteOutput(WindowedValue output) {} - - @Override - public void noteOutput(TupleTag tag, WindowedValue output) {} - - @Override - public void writePCollectionViewData( - TupleTag tag, - Iterable> data, - Coder>> dataCoder, - W window, - Coder windowCoder) throws IOException { - throw new UnsupportedOperationException("Writing side-input data is not supported."); - } - - @Override - public StateInternals stateInternals() { - return stateInternals; - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - } - - private class FlinkTimerInternals implements TimerInternals { - - @Override - public void setTimer( - StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - setTimer(TimerData.of(timerId, namespace, target, timeDomain)); - } - - @Deprecated - @Override - public void setTimer(TimerData timerKey) { - long time = timerKey.getTimestamp().getMillis(); - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - timerService.registerEventTimeTimer(timerKey, time); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - timerService.registerProcessingTimeTimer(timerKey, time); - } else { - throw new UnsupportedOperationException( - "Unsupported time domain: " + timerKey.getDomain()); - } - } - - @Deprecated - @Override - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException( - "Canceling of a timer by ID is not yet supported."); - } - - @Override - public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - throw new UnsupportedOperationException( - "Canceling of a timer by ID is not yet supported."); - } - - @Deprecated - @Override - public void deleteTimer(TimerData timerKey) { - long time = timerKey.getTimestamp().getMillis(); - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - timerService.deleteEventTimeTimer(timerKey, time); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - timerService.deleteProcessingTimeTimer(timerKey, time); - } else { - throw new UnsupportedOperationException( - "Unsupported time domain: " + timerKey.getDomain()); - } - } - - @Override - public Instant currentProcessingTime() { - return new Instant(timerService.currentProcessingTime()); - } - - @Nullable - @Override - public Instant currentSynchronizedProcessingTime() { - return new Instant(timerService.currentProcessingTime()); - } - - @Override - public Instant currentInputWatermarkTime() { - return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); - } - - @Nullable - @Override - public Instant currentOutputWatermarkTime() { - return new Instant(currentOutputWatermark); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java deleted file mode 100644 index dce2e68..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import java.nio.ByteBuffer; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - -/** - * {@link KeySelector} that retrieves a key from a {@link KV}. This will return - * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures - * that all key comparisons/hashing happen on the encoded form. - */ -public class KvToByteBufferKeySelector - implements KeySelector>, ByteBuffer>, - ResultTypeQueryable { - - private final Coder keyCoder; - - public KvToByteBufferKeySelector(Coder keyCoder) { - this.keyCoder = keyCoder; - } - - @Override - public ByteBuffer getKey(WindowedValue> value) throws Exception { - K key = value.getValue().getKey(); - byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key); - return ByteBuffer.wrap(keyBytes); - } - - @Override - public TypeInformation getProducedType() { - return new GenericTypeInfo<>(ByteBuffer.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java deleted file mode 100644 index e843660..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import java.util.Collections; -import org.apache.beam.runners.core.KeyedWorkItem; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * Singleton keyed word item. - */ -public class SingletonKeyedWorkItem implements KeyedWorkItem { - - final K key; - final WindowedValue value; - - public SingletonKeyedWorkItem(K key, WindowedValue value) { - this.key = key; - this.value = value; - } - - @Override - public K key() { - return key; - } - - public WindowedValue value() { - return value; - } - - @Override - public Iterable timersIterable() { - return Collections.EMPTY_LIST; - } - - @Override - public Iterable> elementsIterable() { - return Collections.singletonList(value); - } -}