Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8D5F4190F7 for ; Fri, 4 Mar 2016 18:11:27 +0000 (UTC) Received: (qmail 65159 invoked by uid 500); 4 Mar 2016 18:11:27 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 65075 invoked by uid 500); 4 Mar 2016 18:11:27 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 65034 invoked by uid 99); 4 Mar 2016 18:11:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 18:11:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D0D77C0112 for ; Fri, 4 Mar 2016 18:11:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id v4yDkO36xFjD for ; Fri, 4 Mar 2016 18:11:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id C3B0F5FD22 for ; Fri, 4 Mar 2016 18:10:59 +0000 (UTC) Received: (qmail 61097 invoked by uid 99); 4 Mar 2016 18:10:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2016 18:10:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 83E9DE7884; Fri, 4 Mar 2016 18:10:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Fri, 04 Mar 2016 18:11:39 -0000 Message-Id: <78629a338e9642deaf7ef516aaf6f659@git.apache.org> In-Reply-To: <844cc05de9854f8599738d83d11f8ce1@git.apache.org> References: <844cc05de9854f8599738d83d11f8ce1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java index 0befa88..3cc5c24 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java @@ -38,113 +38,113 @@ import java.util.List; */ @SuppressWarnings("serial") public class UnionCoder extends StandardCoder { - // TODO: Think about how to integrate this with a schema object (i.e. - // a tuple of tuple tags). - /** - * Builds a union coder with the given list of element coders. This list - * corresponds to a mapping of union tag to Coder. Union tags start at 0. - */ - public static UnionCoder of(List> elementCoders) { - return new UnionCoder(elementCoders); - } - - @JsonCreator - public static UnionCoder jsonOf( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List> elements) { - return UnionCoder.of(elements); - } - - private int getIndexForEncoding(RawUnionValue union) { - if (union == null) { - throw new IllegalArgumentException("cannot encode a null tagged union"); - } - int index = union.getUnionTag(); - if (index < 0 || index >= elementCoders.size()) { - throw new IllegalArgumentException( - "union value index " + index + " not in range [0.." + - (elementCoders.size() - 1) + "]"); - } - return index; - } - - @SuppressWarnings("unchecked") - @Override - public void encode( - RawUnionValue union, - OutputStream outStream, - Context context) - throws IOException { - int index = getIndexForEncoding(union); - // Write out the union tag. - VarInt.encode(index, outStream); - - // Write out the actual value. - Coder coder = (Coder) elementCoders.get(index); - coder.encode( - union.getValue(), - outStream, - context); - } - - @Override - public RawUnionValue decode(InputStream inStream, Context context) - throws IOException { - int index = VarInt.decodeInt(inStream); - Object value = elementCoders.get(index).decode(inStream, context); - return new RawUnionValue(index, value); - } - - @Override - public List> getCoderArguments() { - return null; - } - - @Override - public List> getComponents() { - return elementCoders; - } - - /** - * Since this coder uses elementCoders.get(index) and coders that are known to run in constant - * time, we defer the return value to that coder. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { - int index = getIndexForEncoding(union); - @SuppressWarnings("unchecked") - Coder coder = (Coder) elementCoders.get(index); - return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. - */ - @Override - public void registerByteSizeObserver( - RawUnionValue union, ElementByteSizeObserver observer, Context context) - throws Exception { - int index = getIndexForEncoding(union); - // Write out the union tag. - observer.update(VarInt.getLength(index)); - // Write out the actual value. - @SuppressWarnings("unchecked") - Coder coder = (Coder) elementCoders.get(index); - coder.registerByteSizeObserver(union.getValue(), observer, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - private final List> elementCoders; - - private UnionCoder(List> elementCoders) { - this.elementCoders = elementCoders; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - "UnionCoder is only deterministic if all element coders are", - elementCoders); - } + // TODO: Think about how to integrate this with a schema object (i.e. + // a tuple of tuple tags). + /** + * Builds a union coder with the given list of element coders. This list + * corresponds to a mapping of union tag to Coder. Union tags start at 0. + */ + public static UnionCoder of(List> elementCoders) { + return new UnionCoder(elementCoders); + } + + @JsonCreator + public static UnionCoder jsonOf( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> elements) { + return UnionCoder.of(elements); + } + + private int getIndexForEncoding(RawUnionValue union) { + if (union == null) { + throw new IllegalArgumentException("cannot encode a null tagged union"); + } + int index = union.getUnionTag(); + if (index < 0 || index >= elementCoders.size()) { + throw new IllegalArgumentException( + "union value index " + index + " not in range [0.." + + (elementCoders.size() - 1) + "]"); + } + return index; + } + + @SuppressWarnings("unchecked") + @Override + public void encode( + RawUnionValue union, + OutputStream outStream, + Context context) + throws IOException { + int index = getIndexForEncoding(union); + // Write out the union tag. + VarInt.encode(index, outStream); + + // Write out the actual value. + Coder coder = (Coder) elementCoders.get(index); + coder.encode( + union.getValue(), + outStream, + context); + } + + @Override + public RawUnionValue decode(InputStream inStream, Context context) + throws IOException { + int index = VarInt.decodeInt(inStream); + Object value = elementCoders.get(index).decode(inStream, context); + return new RawUnionValue(index, value); + } + + @Override + public List> getCoderArguments() { + return null; + } + + @Override + public List> getComponents() { + return elementCoders; + } + + /** + * Since this coder uses elementCoders.get(index) and coders that are known to run in constant + * time, we defer the return value to that coder. + */ + @Override + public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { + int index = getIndexForEncoding(union); + @SuppressWarnings("unchecked") + Coder coder = (Coder) elementCoders.get(index); + return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); + } + + /** + * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. + */ + @Override + public void registerByteSizeObserver( + RawUnionValue union, ElementByteSizeObserver observer, Context context) + throws Exception { + int index = getIndexForEncoding(union); + // Write out the union tag. + observer.update(VarInt.getLength(index)); + // Write out the actual value. + @SuppressWarnings("unchecked") + Coder coder = (Coder) elementCoders.get(index); + coder.registerByteSizeObserver(union.getValue(), observer, context); + } + + ///////////////////////////////////////////////////////////////////////////// + + private final List> elementCoders; + + private UnionCoder(List> elementCoders) { + this.elementCoders = elementCoders; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + "UnionCoder is only deterministic if all element coders are", + elementCoders); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java index e433589..b402f7c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java @@ -32,185 +32,185 @@ import java.io.ObjectInputStream; */ public class CoderComparator extends TypeComparator { - private Coder coder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - public CoderComparator(Coder coder) { - this.coder = coder; - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - referenceBuffer.reset(); - try { - coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(T candidate) { - try { - buffer2.reset(); - coder.encode(candidate, buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - @Override - public int compare(T first, T second) { - try { - buffer1.reset(); - buffer2.reset(); - coder.encode(first, buffer1, Coder.Context.OUTER); - coder.encode(second, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare: ", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - CoderTypeSerializer serializer = new CoderTypeSerializer<>(coder); - T first = serializer.deserialize(firstSource); - T second = serializer.deserialize(secondSource); - return compare(first, second); - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - coder.encode(record, buffer1, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - target.put(offset, data, 0, Math.min(numBytes, buffer1.size())); - - offset += buffer1.size(); - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator duplicate() { - return new CoderComparator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] { this.duplicate() }; - } + private Coder coder; + + // We use these for internal encoding/decoding for creating copies and comparing + // serialized forms using a Coder + private transient InspectableByteArrayOutputStream buffer1; + private transient InspectableByteArrayOutputStream buffer2; + + // For storing the Reference in encoded form + private transient InspectableByteArrayOutputStream referenceBuffer; + + public CoderComparator(Coder coder) { + this.coder = coder; + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + } + + @Override + public int hash(T record) { + return record.hashCode(); + } + + @Override + public void setReference(T toCompare) { + referenceBuffer.reset(); + try { + coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not set reference " + toCompare + ": " + e); + } + } + + @Override + public boolean equalToReference(T candidate) { + try { + buffer2.reset(); + coder.encode(candidate, buffer2, Coder.Context.OUTER); + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (referenceBuffer.size() != buffer2.size()) { + return false; + } + int len = buffer2.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareToReference(TypeComparator other) { + InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator) other).referenceBuffer; + + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = otherReferenceBuffer.getBuffer(); + if (referenceBuffer.size() != otherReferenceBuffer.size()) { + return referenceBuffer.size() - otherReferenceBuffer.size(); + } + int len = referenceBuffer.size(); + for (int i = 0; i < len; i++) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } + + @Override + public int compare(T first, T second) { + try { + buffer1.reset(); + buffer2.reset(); + coder.encode(first, buffer1, Coder.Context.OUTER); + coder.encode(second, buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare: ", e); + } + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + CoderTypeSerializer serializer = new CoderTypeSerializer<>(coder); + T first = serializer.deserialize(firstSource); + T second = serializer.deserialize(secondSource); + return compare(first, second); + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + buffer1.reset(); + try { + coder.encode(record, buffer1, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); + } + final byte[] data = buffer1.getBuffer(); + final int limit = offset + numBytes; + + target.put(offset, data, 0, Math.min(numBytes, buffer1.size())); + + offset += buffer1.size(); + + while (offset < limit) { + target.put(offset++, (byte) 0); + } + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator duplicate() { + return new CoderComparator<>(coder); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return new TypeComparator[] { this.duplicate() }; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java index dd9c5f6..ae4309e 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java @@ -32,85 +32,85 @@ import com.google.common.base.Preconditions; */ public class CoderTypeInformation extends TypeInformation implements AtomicType { - private final Coder coder; - - public CoderTypeInformation(Coder coder) { - Preconditions.checkNotNull(coder); - this.coder = coder; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - @SuppressWarnings("unchecked") - public Class getTypeClass() { - // We don't have the Class, so we have to pass null here. What a shame... - return (Class) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer createSerializer(ExecutionConfig config) { - if (coder instanceof VoidCoder) { - return (TypeSerializer) new VoidCoderTypeSerializer(); - } - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeInformation that = (CoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeInformation; - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - public TypeComparator createComparator(boolean sortOrderAscending, ExecutionConfig - executionConfig) { - return new CoderComparator<>(coder); - } + private final Coder coder; + + public CoderTypeInformation(Coder coder) { + Preconditions.checkNotNull(coder); + this.coder = coder; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + @SuppressWarnings("unchecked") + public Class getTypeClass() { + // We don't have the Class, so we have to pass null here. What a shame... + return (Class) Object.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer createSerializer(ExecutionConfig config) { + if (coder instanceof VoidCoder) { + return (TypeSerializer) new VoidCoderTypeSerializer(); + } + return new CoderTypeSerializer<>(coder); + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CoderTypeInformation that = (CoderTypeInformation) o; + + return coder.equals(that.coder); + + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeInformation; + } + + @Override + public String toString() { + return "CoderTypeInformation{" + + "coder=" + coder + + '}'; + } + + @Override + public TypeComparator createComparator(boolean sortOrderAscending, ExecutionConfig + executionConfig) { + return new CoderComparator<>(coder); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java index f739397..6ed661c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java @@ -35,118 +35,118 @@ import java.io.ObjectInputStream; * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s */ public class CoderTypeSerializer extends TypeSerializer { - - private Coder coder; - private transient DataInputViewWrapper inputWrapper; - private transient DataOutputViewWrapper outputWrapper; - - // We use this for internal encoding/decoding for creating copies using the Coder. - private transient InspectableByteArrayOutputStream buffer; - - public CoderTypeSerializer(Coder coder) { - this.coder = coder; - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CoderTypeSerializer duplicate() { - return new CoderTypeSerializer<>(coder); - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T t) { - buffer.reset(); - try { - coder.encode(t, buffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - try { - return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer - .size()), Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - } - - @Override - public T copy(T t, T reuse) { - return copy(t); - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(T t, DataOutputView dataOutputView) throws IOException { - outputWrapper.setOutputView(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); - } - - @Override - public T deserialize(DataInputView dataInputView) throws IOException { - try { - inputWrapper.setInputView(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); - } catch (CoderException e) { - Throwable cause = e.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw e; - } - } - } - - @Override - public T deserialize(T t, DataInputView dataInputView) throws IOException { - return deserialize(dataInputView); - } - - @Override - public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { - serialize(deserialize(dataInputView), dataOutputView); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeSerializer that = (CoderTypeSerializer) o; - return coder.equals(that.coder); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeSerializer; - } - - @Override - public int hashCode() { - return coder.hashCode(); - } + + private Coder coder; + private transient DataInputViewWrapper inputWrapper; + private transient DataOutputViewWrapper outputWrapper; + + // We use this for internal encoding/decoding for creating copies using the Coder. + private transient InspectableByteArrayOutputStream buffer; + + public CoderTypeSerializer(Coder coder) { + this.coder = coder; + this.inputWrapper = new DataInputViewWrapper(null); + this.outputWrapper = new DataOutputViewWrapper(null); + + buffer = new InspectableByteArrayOutputStream(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.inputWrapper = new DataInputViewWrapper(null); + this.outputWrapper = new DataOutputViewWrapper(null); + + buffer = new InspectableByteArrayOutputStream(); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public CoderTypeSerializer duplicate() { + return new CoderTypeSerializer<>(coder); + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T t) { + buffer.reset(); + try { + coder.encode(t, buffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not copy.", e); + } + try { + return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer + .size()), Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not copy.", e); + } + } + + @Override + public T copy(T t, T reuse) { + return copy(t); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(T t, DataOutputView dataOutputView) throws IOException { + outputWrapper.setOutputView(dataOutputView); + coder.encode(t, outputWrapper, Coder.Context.NESTED); + } + + @Override + public T deserialize(DataInputView dataInputView) throws IOException { + try { + inputWrapper.setInputView(dataInputView); + return coder.decode(inputWrapper, Coder.Context.NESTED); + } catch (CoderException e) { + Throwable cause = e.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } else { + throw e; + } + } + } + + @Override + public T deserialize(T t, DataInputView dataInputView) throws IOException { + return deserialize(dataInputView); + } + + @Override + public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { + serialize(deserialize(dataInputView), dataOutputView); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CoderTypeSerializer that = (CoderTypeSerializer) o; + return coder.equals(that.coder); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeSerializer; + } + + @Override + public int hashCode() { + return coder.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java index 5d918cc..be6eadd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java @@ -25,10 +25,10 @@ import java.io.ByteArrayOutputStream; */ public class InspectableByteArrayOutputStream extends ByteArrayOutputStream { - /** - * Get the underlying byte array. - */ - public byte[] getBuffer() { - return buf; - } + /** + * Get the underlying byte array. + */ + public byte[] getBuffer() { + return buf; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java index 815569d..ba09ea9 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java @@ -35,230 +35,230 @@ import java.io.ObjectInputStream; * for {@link KV} that always compares on the key only. */ public class KvCoderComperator extends TypeComparator> { - - private KvCoder coder; - private Coder keyCoder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - - // For deserializing the key - private transient DataInputViewWrapper inputWrapper; - - public KvCoderComperator(KvCoder coder) { - this.coder = coder; - this.keyCoder = coder.getKeyCoder(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - @Override - public int hash(KV record) { - K key = record.getKey(); - if (key != null) { - return key.hashCode(); - } else { - return 0; - } - } - - @Override - public void setReference(KV toCompare) { - referenceBuffer.reset(); - try { - keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(KV candidate) { - try { - buffer2.reset(); - keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator> other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - - @Override - public int compare(KV first, KV second) { - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); - keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - - inputWrapper.setInputView(firstSource); - K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - inputWrapper.setInputView(secondSource); - K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); - keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(KV record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - int numBytesPut = Math.min(numBytes, buffer1.size()); - - target.put(offset, data, 0, numBytesPut); - - offset += numBytesPut; - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(KV record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public KV readWithKeyDenormalization(KV reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator> duplicate() { - return new KvCoderComperator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - KV kv = (KV) record; - K k = kv.getKey(); - target[index] = k; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] {new CoderComparator<>(keyCoder)}; - } + + private KvCoder coder; + private Coder keyCoder; + + // We use these for internal encoding/decoding for creating copies and comparing + // serialized forms using a Coder + private transient InspectableByteArrayOutputStream buffer1; + private transient InspectableByteArrayOutputStream buffer2; + + // For storing the Reference in encoded form + private transient InspectableByteArrayOutputStream referenceBuffer; + + + // For deserializing the key + private transient DataInputViewWrapper inputWrapper; + + public KvCoderComperator(KvCoder coder) { + this.coder = coder; + this.keyCoder = coder.getKeyCoder(); + + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + + inputWrapper = new DataInputViewWrapper(null); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + + inputWrapper = new DataInputViewWrapper(null); + } + + @Override + public int hash(KV record) { + K key = record.getKey(); + if (key != null) { + return key.hashCode(); + } else { + return 0; + } + } + + @Override + public void setReference(KV toCompare) { + referenceBuffer.reset(); + try { + keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not set reference " + toCompare + ": " + e); + } + } + + @Override + public boolean equalToReference(KV candidate) { + try { + buffer2.reset(); + keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (referenceBuffer.size() != buffer2.size()) { + return false; + } + int len = buffer2.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareToReference(TypeComparator> other) { + InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator) other).referenceBuffer; + + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = otherReferenceBuffer.getBuffer(); + if (referenceBuffer.size() != otherReferenceBuffer.size()) { + return referenceBuffer.size() - otherReferenceBuffer.size(); + } + int len = referenceBuffer.size(); + for (int i = 0; i < len; i++) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } + + + @Override + public int compare(KV first, KV second) { + try { + buffer1.reset(); + buffer2.reset(); + keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); + keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + + inputWrapper.setInputView(firstSource); + K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); + inputWrapper.setInputView(secondSource); + K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); + + try { + buffer1.reset(); + buffer2.reset(); + keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); + keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + @Override + public void putNormalizedKey(KV record, MemorySegment target, int offset, int numBytes) { + buffer1.reset(); + try { + keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); + } catch (IOException e) { + throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); + } + final byte[] data = buffer1.getBuffer(); + final int limit = offset + numBytes; + + int numBytesPut = Math.min(numBytes, buffer1.size()); + + target.put(offset, data, 0, numBytesPut); + + offset += numBytesPut; + + while (offset < limit) { + target.put(offset++, (byte) 0); + } + } + + @Override + public void writeWithKeyNormalization(KV record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public KV readWithKeyDenormalization(KV reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator> duplicate() { + return new KvCoderComperator<>(coder); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + KV kv = (KV) record; + K k = kv.getKey(); + target[index] = k; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return new TypeComparator[] {new CoderComparator<>(keyCoder)}; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java index 090f79d..be11918 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java @@ -34,153 +34,153 @@ import java.util.List; */ public class KvCoderTypeInformation extends CompositeType> { - private KvCoder coder; - - // We don't have the Class, so we have to pass null here. What a shame... - private static Object DUMMY = new Object(); - - @SuppressWarnings("unchecked") - public KvCoderTypeInformation(KvCoder coder) { - super(((Class>) DUMMY.getClass())); - this.coder = coder; - Preconditions.checkNotNull(coder); - } - - @Override - @SuppressWarnings("unchecked") - public TypeComparator> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { - return new KvCoderComperator((KvCoder) coder); - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 2; - } - - @Override - @SuppressWarnings("unchecked") - public Class> getTypeClass() { - return privateGetTypeClass(); - } - - @SuppressWarnings("unchecked") - private static Class privateGetTypeClass() { - return (Class) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer> createSerializer(ExecutionConfig config) { - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - KvCoderTypeInformation that = (KvCoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation getTypeAt(int pos) { - if (pos == 0) { - return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); - } else if (pos == 1) { - return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); - } else { - throw new RuntimeException("Invalid field position " + pos); - } - } - - @Override - @SuppressWarnings("unchecked") - public TypeInformation getTypeAt(String fieldExpression) { - switch (fieldExpression) { - case "key": - return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); - case "value": - return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); - default: - throw new UnsupportedOperationException("Only KvCoder has fields."); - } - } - - @Override - public String[] getFieldNames() { - return new String[]{"key", "value"}; - } - - @Override - public int getFieldIndex(String fieldName) { - switch (fieldName) { - case "key": - return 0; - case "value": - return 1; - default: - return -1; - } - } - - @Override - public void getFlatFields(String fieldExpression, int offset, List result) { - CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); - result.add(new FlatFieldDescriptor(0, keyTypeInfo)); - } - - @Override - protected TypeComparatorBuilder> createTypeComparatorBuilder() { - return new KvCoderTypeComparatorBuilder(); - } - - private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder> { - - @Override - public void initializeTypeComparatorBuilder(int size) {} - - @Override - public void addComparatorField(int fieldId, TypeComparator comparator) {} - - @Override - public TypeComparator> createTypeComparator(ExecutionConfig config) { - return new KvCoderComperator<>(coder); - } - } + private KvCoder coder; + + // We don't have the Class, so we have to pass null here. What a shame... + private static Object DUMMY = new Object(); + + @SuppressWarnings("unchecked") + public KvCoderTypeInformation(KvCoder coder) { + super(((Class>) DUMMY.getClass())); + this.coder = coder; + Preconditions.checkNotNull(coder); + } + + @Override + @SuppressWarnings("unchecked") + public TypeComparator> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { + return new KvCoderComperator((KvCoder) coder); + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 2; + } + + @Override + @SuppressWarnings("unchecked") + public Class> getTypeClass() { + return privateGetTypeClass(); + } + + @SuppressWarnings("unchecked") + private static Class privateGetTypeClass() { + return (Class) Object.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer> createSerializer(ExecutionConfig config) { + return new CoderTypeSerializer<>(coder); + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KvCoderTypeInformation that = (KvCoderTypeInformation) o; + + return coder.equals(that.coder); + + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public String toString() { + return "CoderTypeInformation{" + + "coder=" + coder + + '}'; + } + + @Override + @SuppressWarnings("unchecked") + public TypeInformation getTypeAt(int pos) { + if (pos == 0) { + return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); + } else if (pos == 1) { + return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); + } else { + throw new RuntimeException("Invalid field position " + pos); + } + } + + @Override + @SuppressWarnings("unchecked") + public TypeInformation getTypeAt(String fieldExpression) { + switch (fieldExpression) { + case "key": + return (TypeInformation) new CoderTypeInformation<>(coder.getKeyCoder()); + case "value": + return (TypeInformation) new CoderTypeInformation<>(coder.getValueCoder()); + default: + throw new UnsupportedOperationException("Only KvCoder has fields."); + } + } + + @Override + public String[] getFieldNames() { + return new String[]{"key", "value"}; + } + + @Override + public int getFieldIndex(String fieldName) { + switch (fieldName) { + case "key": + return 0; + case "value": + return 1; + default: + return -1; + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List result) { + CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); + result.add(new FlatFieldDescriptor(0, keyTypeInfo)); + } + + @Override + protected TypeComparatorBuilder> createTypeComparatorBuilder() { + return new KvCoderTypeComparatorBuilder(); + } + + private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder> { + + @Override + public void initializeTypeComparatorBuilder(int size) {} + + @Override + public void addComparatorField(int fieldId, TypeComparator comparator) {} + + @Override + public TypeComparator> createTypeComparator(ExecutionConfig config) { + return new KvCoderComperator<>(coder); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java index 7ce484a..190d898 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java @@ -31,82 +31,82 @@ import java.io.IOException; */ public class VoidCoderTypeSerializer extends TypeSerializer { - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public VoidCoderTypeSerializer duplicate() { - return this; - } - - @Override - public VoidValue createInstance() { - return VoidValue.INSTANCE; - } - - @Override - public VoidValue copy(VoidValue from) { - return from; - } - - @Override - public VoidValue copy(VoidValue from, VoidValue reuse) { - return from; - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(VoidValue record, DataOutputView target) throws IOException { - target.writeByte(1); - } - - @Override - public VoidValue deserialize(DataInputView source) throws IOException { - source.readByte(); - return VoidValue.INSTANCE; - } - - @Override - public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - source.readByte(); - target.writeByte(1); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof VoidCoderTypeSerializer) { - VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; - return other.canEqual(this); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof VoidCoderTypeSerializer; - } - - @Override - public int hashCode() { - return 0; - } - - public static class VoidValue { - private VoidValue() {} - - public static VoidValue INSTANCE = new VoidValue(); - } + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public VoidCoderTypeSerializer duplicate() { + return this; + } + + @Override + public VoidValue createInstance() { + return VoidValue.INSTANCE; + } + + @Override + public VoidValue copy(VoidValue from) { + return from; + } + + @Override + public VoidValue copy(VoidValue from, VoidValue reuse) { + return from; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(VoidValue record, DataOutputView target) throws IOException { + target.writeByte(1); + } + + @Override + public VoidValue deserialize(DataInputView source) throws IOException { + source.readByte(); + return VoidValue.INSTANCE; + } + + @Override + public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + source.readByte(); + target.writeByte(1); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof VoidCoderTypeSerializer) { + VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; + return other.canEqual(this); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof VoidCoderTypeSerializer; + } + + @Override + public int hashCode() { + return 0; + } + + public static class VoidValue { + private VoidValue() {} + + public static VoidValue INSTANCE = new VoidValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java index 924b297..8f6d67c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java @@ -31,62 +31,62 @@ import java.io.Serializable; * operation. */ public class CombineFnAggregatorWrapper implements Aggregator, Accumulator { - - private AA aa; - private Combine.CombineFn combiner; + + private AA aa; + private Combine.CombineFn combiner; - public CombineFnAggregatorWrapper() { - } + public CombineFnAggregatorWrapper() { + } - public CombineFnAggregatorWrapper(Combine.CombineFn combiner) { - this.combiner = combiner; - this.aa = combiner.createAccumulator(); - } + public CombineFnAggregatorWrapper(Combine.CombineFn combiner) { + this.combiner = combiner; + this.aa = combiner.createAccumulator(); + } - @Override - public void add(AI value) { - combiner.addInput(aa, value); - } + @Override + public void add(AI value) { + combiner.addInput(aa, value); + } - @Override - public Serializable getLocalValue() { - return (Serializable) combiner.extractOutput(aa); - } + @Override + public Serializable getLocalValue() { + return (Serializable) combiner.extractOutput(aa); + } - @Override - public void resetLocal() { - aa = combiner.createAccumulator(); - } + @Override + public void resetLocal() { + aa = combiner.createAccumulator(); + } - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator other) { - aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper)other).aa)); - } + @Override + @SuppressWarnings("unchecked") + public void merge(Accumulator other) { + aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper)other).aa)); + } - @Override - public Accumulator clone() { - // copy it by merging - AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); - CombineFnAggregatorWrapper result = new - CombineFnAggregatorWrapper<>(combiner); - result.aa = aaCopy; - return result; - } + @Override + public Accumulator clone() { + // copy it by merging + AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); + CombineFnAggregatorWrapper result = new + CombineFnAggregatorWrapper<>(combiner); + result.aa = aaCopy; + return result; + } - @Override - public void addValue(AI value) { - add(value); - } + @Override + public void addValue(AI value) { + add(value); + } - @Override - public String getName() { - return "CombineFn: " + combiner.toString(); - } + @Override + public String getName() { + return "CombineFn: " + combiner.toString(); + } - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } + @Override + public Combine.CombineFn getCombineFn() { + return combiner; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java index 90582b0..3c96939 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java @@ -31,29 +31,29 @@ import java.io.InputStream; */ public class DataInputViewWrapper extends InputStream { - private DataInputView inputView; - - public DataInputViewWrapper(DataInputView inputView) { - this.inputView = inputView; - } - - public void setInputView(DataInputView inputView) { - this.inputView = inputView; - } - - @Override - public int read() throws IOException { - try { - return inputView.readUnsignedByte(); - } catch (EOFException e) { - // translate between DataInput and InputStream, - // DataInput signals EOF by exception, InputStream does it by returning -1 - return -1; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } + private DataInputView inputView; + + public DataInputViewWrapper(DataInputView inputView) { + this.inputView = inputView; + } + + public void setInputView(DataInputView inputView) { + this.inputView = inputView; + } + + @Override + public int read() throws IOException { + try { + return inputView.readUnsignedByte(); + } catch (EOFException e) { + // translate between DataInput and InputStream, + // DataInput signals EOF by exception, InputStream does it by returning -1 + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputView.read(b, off, len); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java index 46df8e5..a222cdd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java @@ -29,24 +29,24 @@ import java.io.OutputStream; * {@link java.io.OutputStream}. */ public class DataOutputViewWrapper extends OutputStream { - - private DataOutputView outputView; + + private DataOutputView outputView; - public DataOutputViewWrapper(DataOutputView outputView) { - this.outputView = outputView; - } + public DataOutputViewWrapper(DataOutputView outputView) { + this.outputView = outputView; + } - public void setOutputView(DataOutputView outputView) { - this.outputView = outputView; - } + public void setOutputView(DataOutputView outputView) { + this.outputView = outputView; + } - @Override - public void write(int b) throws IOException { - outputView.write(b); - } + @Override + public void write(int b) throws IOException { + outputView.write(b); + } - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputView.write(b, off, len); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java index 1c0dae4..c193a4d 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -33,59 +33,59 @@ import java.io.Serializable; */ public class SerializableFnAggregatorWrapper implements Aggregator, Accumulator { - private AO aa; - private Combine.CombineFn combiner; + private AO aa; + private Combine.CombineFn combiner; - public SerializableFnAggregatorWrapper(Combine.CombineFn combiner) { - this.combiner = combiner; - resetLocal(); - } - - @Override - @SuppressWarnings("unchecked") - public void add(AI value) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); - } + public SerializableFnAggregatorWrapper(Combine.CombineFn combiner) { + this.combiner = combiner; + resetLocal(); + } + + @Override + @SuppressWarnings("unchecked") + public void add(AI value) { + this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); + } - @Override - public Serializable getLocalValue() { - return (Serializable) aa; - } + @Override + public Serializable getLocalValue() { + return (Serializable) aa; + } - @Override - public void resetLocal() { - this.aa = combiner.apply(ImmutableList.of()); - } + @Override + public void resetLocal() { + this.aa = combiner.apply(ImmutableList.of()); + } - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator other) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); - } + @Override + @SuppressWarnings("unchecked") + public void merge(Accumulator other) { + this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); + } - @Override - public void addValue(AI value) { - add(value); - } + @Override + public void addValue(AI value) { + add(value); + } - @Override - public String getName() { - return "Aggregator :" + combiner.toString(); - } + @Override + public String getName() { + return "Aggregator :" + combiner.toString(); + } - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } + @Override + public Combine.CombineFn getCombineFn() { + return combiner; + } - @Override - public Accumulator clone() { - // copy it by merging - AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); - SerializableFnAggregatorWrapper result = new - SerializableFnAggregatorWrapper<>(combiner); + @Override + public Accumulator clone() { + // copy it by merging + AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); + SerializableFnAggregatorWrapper result = new + SerializableFnAggregatorWrapper<>(combiner); - result.aa = resultCopy; - return result; - } + result.aa = resultCopy; + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java index 8be9abf..3f28c16 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java @@ -38,84 +38,84 @@ import java.lang.reflect.Field; */ public class SinkOutputFormat implements OutputFormat { - private final Sink sink; - - private transient PipelineOptions pipelineOptions; - - private Sink.WriteOperation writeOperation; - private Sink.Writer writer; - - private AbstractID uid = new AbstractID(); - - public SinkOutputFormat(Write.Bound transform, PipelineOptions pipelineOptions) { - this.sink = extractSink(transform); - this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); - } - - private Sink extractSink(Write.Bound transform) { - // TODO possibly add a getter in the upstream - try { - Field sinkField = transform.getClass().getDeclaredField("sink"); - sinkField.setAccessible(true); - @SuppressWarnings("unchecked") - Sink extractedSink = (Sink) sinkField.get(transform); - return extractedSink; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not acquire custom sink field.", e); - } - } - - @Override - public void configure(Configuration configuration) { - writeOperation = sink.createWriteOperation(pipelineOptions); - try { - writeOperation.initialize(pipelineOptions); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize the write operation.", e); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - writer = writeOperation.createWriter(pipelineOptions); - } catch (Exception e) { - throw new IOException("Couldn't create writer.", e); - } - try { - writer.open(uid + "-" + String.valueOf(taskNumber)); - } catch (Exception e) { - throw new IOException("Couldn't open writer.", e); - } - } - - @Override - public void writeRecord(T record) throws IOException { - try { - writer.write(record); - } catch (Exception e) { - throw new IOException("Couldn't write record.", e); - } - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (Exception e) { - throw new IOException("Couldn't close writer.", e); - } - } - - private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, pipelineOptions); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - pipelineOptions = mapper.readValue(in, PipelineOptions.class); - } + private final Sink sink; + + private transient PipelineOptions pipelineOptions; + + private Sink.WriteOperation writeOperation; + private Sink.Writer writer; + + private AbstractID uid = new AbstractID(); + + public SinkOutputFormat(Write.Bound transform, PipelineOptions pipelineOptions) { + this.sink = extractSink(transform); + this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); + } + + private Sink extractSink(Write.Bound transform) { + // TODO possibly add a getter in the upstream + try { + Field sinkField = transform.getClass().getDeclaredField("sink"); + sinkField.setAccessible(true); + @SuppressWarnings("unchecked") + Sink extractedSink = (Sink) sinkField.get(transform); + return extractedSink; + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not acquire custom sink field.", e); + } + } + + @Override + public void configure(Configuration configuration) { + writeOperation = sink.createWriteOperation(pipelineOptions); + try { + writeOperation.initialize(pipelineOptions); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize the write operation.", e); + } + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + writer = writeOperation.createWriter(pipelineOptions); + } catch (Exception e) { + throw new IOException("Couldn't create writer.", e); + } + try { + writer.open(uid + "-" + String.valueOf(taskNumber)); + } catch (Exception e) { + throw new IOException("Couldn't open writer.", e); + } + } + + @Override + public void writeRecord(T record) throws IOException { + try { + writer.write(record); + } catch (Exception e) { + throw new IOException("Couldn't write record.", e); + } + } + + @Override + public void close() throws IOException { + try { + writer.close(); + } catch (Exception e) { + throw new IOException("Couldn't close writer.", e); + } + } + + private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, pipelineOptions); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + pipelineOptions = mapper.readValue(in, PipelineOptions.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java index 64dc072..5981618 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java @@ -41,124 +41,124 @@ import java.util.List; * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}. */ public class SourceInputFormat implements InputFormat> { - private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); - - private final BoundedSource initialSource; - private transient PipelineOptions options; - - private BoundedSource.BoundedReader reader = null; - private boolean reachedEnd = true; - - public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { - this.initialSource = initialSource; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void configure(Configuration configuration) {} - - @Override - public void open(SourceInputSplit sourceInputSplit) throws IOException { - reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); - reachedEnd = false; - } - - @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { - try { - final long estimatedSize = initialSource.getEstimatedSizeBytes(options); - - return new BaseStatistics() { - @Override - public long getTotalInputSize() { - return estimatedSize; - - } - - @Override - public long getNumberOfRecords() { - return BaseStatistics.NUM_RECORDS_UNKNOWN; - } - - @Override - public float getAverageRecordWidth() { - return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; - } - }; - } catch (Exception e) { - LOG.warn("Could not read Source statistics: {}", e); - } - - return null; - } - - @Override - @SuppressWarnings("unchecked") - public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { - long desiredSizeBytes; - try { - desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List> shards = initialSource.splitIntoBundles(desiredSizeBytes, - options); - List> splits = new ArrayList<>(); - int splitCount = 0; - for (Source shard: shards) { - splits.add(new SourceInputSplit<>(shard, splitCount++)); - } - return splits.toArray(new SourceInputSplit[splits.size()]); - } catch (Exception e) { - throw new IOException("Could not create input splits from Source.", e); - } - } - - @Override - public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { - return new InputSplitAssigner() { - private int index = 0; - private final SourceInputSplit[] splits = sourceInputSplits; - @Override - public InputSplit getNextInputSplit(String host, int taskId) { - if (index < splits.length) { - return splits[index++]; - } else { - return null; - } - } - }; - } - - - @Override - public boolean reachedEnd() throws IOException { - return reachedEnd; - } - - @Override - public T nextRecord(T t) throws IOException { - - reachedEnd = !reader.advance(); - if (!reachedEnd) { - return reader.getCurrent(); - } - return null; - } - - @Override - public void close() throws IOException { - reader.close(); - } + private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); + + private final BoundedSource initialSource; + private transient PipelineOptions options; + + private BoundedSource.BoundedReader reader = null; + private boolean reachedEnd = true; + + public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { + this.initialSource = initialSource; + this.options = options; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, options); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + options = mapper.readValue(in, PipelineOptions.class); + } + + @Override + public void configure(Configuration configuration) {} + + @Override + public void open(SourceInputSplit sourceInputSplit) throws IOException { + reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); + reachedEnd = false; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + try { + final long estimatedSize = initialSource.getEstimatedSizeBytes(options); + + return new BaseStatistics() { + @Override + public long getTotalInputSize() { + return estimatedSize; + + } + + @Override + public long getNumberOfRecords() { + return BaseStatistics.NUM_RECORDS_UNKNOWN; + } + + @Override + public float getAverageRecordWidth() { + return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; + } + }; + } catch (Exception e) { + LOG.warn("Could not read Source statistics: {}", e); + } + + return null; + } + + @Override + @SuppressWarnings("unchecked") + public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { + long desiredSizeBytes; + try { + desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + List> shards = initialSource.splitIntoBundles(desiredSizeBytes, + options); + List> splits = new ArrayList<>(); + int splitCount = 0; + for (Source shard: shards) { + splits.add(new SourceInputSplit<>(shard, splitCount++)); + } + return splits.toArray(new SourceInputSplit[splits.size()]); + } catch (Exception e) { + throw new IOException("Could not create input splits from Source.", e); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { + return new InputSplitAssigner() { + private int index = 0; + private final SourceInputSplit[] splits = sourceInputSplits; + @Override + public InputSplit getNextInputSplit(String host, int taskId) { + if (index < splits.length) { + return splits[index++]; + } else { + return null; + } + } + }; + } + + + @Override + public boolean reachedEnd() throws IOException { + return reachedEnd; + } + + @Override + public T nextRecord(T t) throws IOException { + + reachedEnd = !reader.advance(); + if (!reachedEnd) { + return reader.getCurrent(); + } + return null; + } + + @Override + public void close() throws IOException { + reader.close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java index 2b93ab7..86fdada 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java @@ -29,24 +29,24 @@ import org.apache.flink.core.io.InputSplit; */ public class SourceInputSplit implements InputSplit { - private Source source; - private int splitNumber; + private Source source; + private int splitNumber; - public SourceInputSplit() { - } + public SourceInputSplit() { + } - public SourceInputSplit(Source source, int splitNumber) { - this.source = source; - this.splitNumber = splitNumber; - } + public SourceInputSplit(Source source, int splitNumber) { + this.source = source; + this.splitNumber = splitNumber; + } - @Override - public int getSplitNumber() { - return splitNumber; - } + @Override + public int getSplitNumber() { + return splitNumber; + } - public Source getSource() { - return source; - } + public Source getSource() { + return source; + } }