beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [44/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces
Date Fri, 04 Mar 2016 18:11:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
index 0befa88..3cc5c24 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
@@ -38,113 +38,113 @@ import java.util.List;
  */
 @SuppressWarnings("serial")
 public class UnionCoder extends StandardCoder<RawUnionValue> {
-	// TODO: Think about how to integrate this with a schema object (i.e.
-	// a tuple of tuple tags).
-	/**
-	 * Builds a union coder with the given list of element coders.  This list
-	 * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
-	 */
-	public static UnionCoder of(List<Coder<?>> elementCoders) {
-		return new UnionCoder(elementCoders);
-	}
-
-	@JsonCreator
-	public static UnionCoder jsonOf(
-			@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-			List<Coder<?>> elements) {
-		return UnionCoder.of(elements);
-	}
-
-	private int getIndexForEncoding(RawUnionValue union) {
-		if (union == null) {
-			throw new IllegalArgumentException("cannot encode a null tagged union");
-		}
-		int index = union.getUnionTag();
-		if (index < 0 || index >= elementCoders.size()) {
-			throw new IllegalArgumentException(
-					"union value index " + index + " not in range [0.." +
-							(elementCoders.size() - 1) + "]");
-		}
-		return index;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void encode(
-			RawUnionValue union,
-			OutputStream outStream,
-			Context context)
-			throws IOException  {
-		int index = getIndexForEncoding(union);
-		// Write out the union tag.
-		VarInt.encode(index, outStream);
-
-		// Write out the actual value.
-		Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-		coder.encode(
-				union.getValue(),
-				outStream,
-				context);
-	}
-
-	@Override
-	public RawUnionValue decode(InputStream inStream, Context context)
-			throws IOException {
-		int index = VarInt.decodeInt(inStream);
-		Object value = elementCoders.get(index).decode(inStream, context);
-		return new RawUnionValue(index, value);
-	}
-
-	@Override
-	public List<? extends Coder<?>> getCoderArguments() {
-		return null;
-	}
-
-	@Override
-	public List<? extends Coder<?>> getComponents() {
-		return elementCoders;
-	}
-
-	/**
-	 * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
-	 * time, we defer the return value to that coder.
-	 */
-	@Override
-	public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
-		int index = getIndexForEncoding(union);
-		@SuppressWarnings("unchecked")
-		Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-		return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
-	}
-
-	/**
-	 * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
-	 */
-	@Override
-	public void registerByteSizeObserver(
-			RawUnionValue union, ElementByteSizeObserver observer, Context context)
-			throws Exception {
-		int index = getIndexForEncoding(union);
-		// Write out the union tag.
-		observer.update(VarInt.getLength(index));
-		// Write out the actual value.
-		@SuppressWarnings("unchecked")
-		Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-		coder.registerByteSizeObserver(union.getValue(), observer, context);
-	}
-
-	/////////////////////////////////////////////////////////////////////////////
-
-	private final List<Coder<?>> elementCoders;
-
-	private UnionCoder(List<Coder<?>> elementCoders) {
-		this.elementCoders = elementCoders;
-	}
-
-	@Override
-	public void verifyDeterministic() throws NonDeterministicException {
-		verifyDeterministic(
-				"UnionCoder is only deterministic if all element coders are",
-				elementCoders);
-	}
+  // TODO: Think about how to integrate this with a schema object (i.e.
+  // a tuple of tuple tags).
+  /**
+   * Builds a union coder with the given list of element coders.  This list
+   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
+   */
+  public static UnionCoder of(List<Coder<?>> elementCoders) {
+    return new UnionCoder(elementCoders);
+  }
+
+  @JsonCreator
+  public static UnionCoder jsonOf(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+      List<Coder<?>> elements) {
+    return UnionCoder.of(elements);
+  }
+
+  private int getIndexForEncoding(RawUnionValue union) {
+    if (union == null) {
+      throw new IllegalArgumentException("cannot encode a null tagged union");
+    }
+    int index = union.getUnionTag();
+    if (index < 0 || index >= elementCoders.size()) {
+      throw new IllegalArgumentException(
+          "union value index " + index + " not in range [0.." +
+              (elementCoders.size() - 1) + "]");
+    }
+    return index;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void encode(
+      RawUnionValue union,
+      OutputStream outStream,
+      Context context)
+      throws IOException  {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    VarInt.encode(index, outStream);
+
+    // Write out the actual value.
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.encode(
+        union.getValue(),
+        outStream,
+        context);
+  }
+
+  @Override
+  public RawUnionValue decode(InputStream inStream, Context context)
+      throws IOException {
+    int index = VarInt.decodeInt(inStream);
+    Object value = elementCoders.get(index).decode(inStream, context);
+    return new RawUnionValue(index, value);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getComponents() {
+    return elementCoders;
+  }
+
+  /**
+   * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
+   * time, we defer the return value to that coder.
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
+    int index = getIndexForEncoding(union);
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
+  }
+
+  /**
+   * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
+   */
+  @Override
+  public void registerByteSizeObserver(
+      RawUnionValue union, ElementByteSizeObserver observer, Context context)
+      throws Exception {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    observer.update(VarInt.getLength(index));
+    // Write out the actual value.
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.registerByteSizeObserver(union.getValue(), observer, context);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final List<Coder<?>> elementCoders;
+
+  private UnionCoder(List<Coder<?>> elementCoders) {
+    this.elementCoders = elementCoders;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(
+        "UnionCoder is only deterministic if all element coders are",
+        elementCoders);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
index e433589..b402f7c 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
@@ -32,185 +32,185 @@ import java.io.ObjectInputStream;
  */
 public class CoderComparator<T> extends TypeComparator<T> {
 
-	private Coder<T> coder;
-
-	// We use these for internal encoding/decoding for creating copies and comparing
-	// serialized forms using a Coder
-	private transient InspectableByteArrayOutputStream buffer1;
-	private transient InspectableByteArrayOutputStream buffer2;
-
-	// For storing the Reference in encoded form
-	private transient InspectableByteArrayOutputStream referenceBuffer;
-
-	public CoderComparator(Coder<T> coder) {
-		this.coder = coder;
-		buffer1 = new InspectableByteArrayOutputStream();
-		buffer2 = new InspectableByteArrayOutputStream();
-		referenceBuffer = new InspectableByteArrayOutputStream();
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		buffer1 = new InspectableByteArrayOutputStream();
-		buffer2 = new InspectableByteArrayOutputStream();
-		referenceBuffer = new InspectableByteArrayOutputStream();
-	}
-
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		referenceBuffer.reset();
-		try {
-			coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-		}
-	}
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		try {
-			buffer2.reset();
-			coder.encode(candidate, buffer2, Coder.Context.OUTER);
-			byte[] arr = referenceBuffer.getBuffer();
-			byte[] arrOther = buffer2.getBuffer();
-			if (referenceBuffer.size() != buffer2.size()) {
-				return false;
-			}
-			int len = buffer2.size();
-			for(int i = 0; i < len; i++ ) {
-				if (arr[i] != arrOther[i]) {
-					return false;
-				}
-			}
-			return true;
-		} catch (IOException e) {
-			throw new RuntimeException("Could not compare reference.", e);
-		}
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> other) {
-		InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer;
-
-		byte[] arr = referenceBuffer.getBuffer();
-		byte[] arrOther = otherReferenceBuffer.getBuffer();
-		if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-			return referenceBuffer.size() - otherReferenceBuffer.size();
-		}
-		int len = referenceBuffer.size();
-		for (int i = 0; i < len; i++) {
-			if (arr[i] != arrOther[i]) {
-				return arr[i] - arrOther[i];
-			}
-		}
-		return 0;
-	}
-
-	@Override
-	public int compare(T first, T second) {
-		try {
-			buffer1.reset();
-			buffer2.reset();
-			coder.encode(first, buffer1, Coder.Context.OUTER);
-			coder.encode(second, buffer2, Coder.Context.OUTER);
-			byte[] arr = buffer1.getBuffer();
-			byte[] arrOther = buffer2.getBuffer();
-			if (buffer1.size() != buffer2.size()) {
-				return buffer1.size() - buffer2.size();
-			}
-			int len = buffer1.size();
-			for(int i = 0; i < len; i++ ) {
-				if (arr[i] != arrOther[i]) {
-					return arr[i] - arrOther[i];
-				}
-			}
-			return 0;
-		} catch (IOException e) {
-			throw new RuntimeException("Could not compare: ", e);
-		}
-	}
-
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
-		T first = serializer.deserialize(firstSource);
-		T second = serializer.deserialize(secondSource);
-		return compare(first, second);
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return true;
-	}
-
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return Integer.MAX_VALUE;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return true;
-	}
-
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		buffer1.reset();
-		try {
-			coder.encode(record, buffer1, Coder.Context.OUTER);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
-		}
-		final byte[] data = buffer1.getBuffer();
-		final int limit = offset + numBytes;
-
-		target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
-
-		offset += buffer1.size();
-
-		while (offset < limit) {
-			target.put(offset++, (byte) 0);
-		}
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return false;
-	}
-
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new CoderComparator<>(coder);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return new TypeComparator[] { this.duplicate() };
-	}
+  private Coder<T> coder;
+
+  // We use these for internal encoding/decoding for creating copies and comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+  public CoderComparator(Coder<T> coder) {
+    this.coder = coder;
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  @Override
+  public int hash(T record) {
+    return record.hashCode();
+  }
+
+  @Override
+  public void setReference(T toCompare) {
+    referenceBuffer.reset();
+    try {
+      coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(T candidate) {
+    try {
+      buffer2.reset();
+      coder.encode(candidate, buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<T> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int compare(T first, T second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      coder.encode(first, buffer1, Coder.Context.OUTER);
+      coder.encode(second, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare: ", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+    CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
+    T first = serializer.deserialize(firstSource);
+    T second = serializer.deserialize(secondSource);
+    return compare(first, second);
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+    buffer1.reset();
+    try {
+      coder.encode(record, buffer1, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
+
+    offset += buffer1.size();
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<T> duplicate() {
+    return new CoderComparator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    target[index] = record;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] { this.duplicate() };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
index dd9c5f6..ae4309e 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
@@ -32,85 +32,85 @@ import com.google.common.base.Preconditions;
  */
 public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
 
-	private final Coder<T> coder;
-
-	public CoderTypeInformation(Coder<T> coder) {
-		Preconditions.checkNotNull(coder);
-		this.coder = coder;
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public Class<T> getTypeClass() {
-		// We don't have the Class, so we have to pass null here. What a shame...
-		return (Class<T>) Object.class;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return true;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-		if (coder instanceof VoidCoder) {
-			return (TypeSerializer<T>) new VoidCoderTypeSerializer();
-		}
-		return new CoderTypeSerializer<>(coder);
-	}
-
-	@Override
-	public int getTotalFields() {
-		return 2;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) return true;
-		if (o == null || getClass() != o.getClass()) return false;
-
-		CoderTypeInformation that = (CoderTypeInformation) o;
-
-		return coder.equals(that.coder);
-
-	}
-
-	@Override
-	public int hashCode() {
-		return coder.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof CoderTypeInformation;
-	}
-
-	@Override
-	public String toString() {
-		return "CoderTypeInformation{" +
-				"coder=" + coder +
-				'}';
-	}
-
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
-			executionConfig) {
-		return new CoderComparator<>(coder);
-	}
+  private final Coder<T> coder;
+
+  public CoderTypeInformation(Coder<T> coder) {
+    Preconditions.checkNotNull(coder);
+    this.coder = coder;
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 1;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<T> getTypeClass() {
+    // We don't have the Class, so we have to pass null here. What a shame...
+    return (Class<T>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+    if (coder instanceof VoidCoder) {
+      return (TypeSerializer<T>) new VoidCoderTypeSerializer();
+    }
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeInformation that = (CoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeInformation;
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
+      executionConfig) {
+    return new CoderComparator<>(coder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
index f739397..6ed661c 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
@@ -35,118 +35,118 @@ import java.io.ObjectInputStream;
  * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s
  */
 public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-	
-	private Coder<T> coder;
-	private transient DataInputViewWrapper inputWrapper;
-	private transient DataOutputViewWrapper outputWrapper;
-
-	// We use this for internal encoding/decoding for creating copies using the Coder.
-	private transient InspectableByteArrayOutputStream buffer;
-
-	public CoderTypeSerializer(Coder<T> coder) {
-		this.coder = coder;
-		this.inputWrapper = new DataInputViewWrapper(null);
-		this.outputWrapper = new DataOutputViewWrapper(null);
-
-		buffer = new InspectableByteArrayOutputStream();
-	}
-	
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		this.inputWrapper = new DataInputViewWrapper(null);
-		this.outputWrapper = new DataOutputViewWrapper(null);
-
-		buffer = new InspectableByteArrayOutputStream();
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public CoderTypeSerializer<T> duplicate() {
-		return new CoderTypeSerializer<>(coder);
-	}
-
-	@Override
-	public T createInstance() {
-		return null;
-	}
-
-	@Override
-	public T copy(T t) {
-		buffer.reset();
-		try {
-			coder.encode(t, buffer, Coder.Context.OUTER);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not copy.", e);
-		}
-		try {
-			return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer
-					.size()), Coder.Context.OUTER);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not copy.", e);
-		}
-	}
-
-	@Override
-	public T copy(T t, T reuse) {
-		return copy(t);
-	}
-
-	@Override
-	public int getLength() {
-		return 0;
-	}
-
-	@Override
-	public void serialize(T t, DataOutputView dataOutputView) throws IOException {
-		outputWrapper.setOutputView(dataOutputView);
-		coder.encode(t, outputWrapper, Coder.Context.NESTED);
-	}
-
-	@Override
-	public T deserialize(DataInputView dataInputView) throws IOException {
-		try {
-			inputWrapper.setInputView(dataInputView);
-			return coder.decode(inputWrapper, Coder.Context.NESTED);
-		} catch (CoderException e) {
-			Throwable cause = e.getCause();
-			if (cause instanceof EOFException) {
-				throw (EOFException) cause;
-			} else {
-				throw e;
-			}
-		}
-	}
-
-	@Override
-	public T deserialize(T t, DataInputView dataInputView) throws IOException {
-		return deserialize(dataInputView);
-	}
-
-	@Override
-	public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
-		serialize(deserialize(dataInputView), dataOutputView);
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) return true;
-		if (o == null || getClass() != o.getClass()) return false;
-
-		CoderTypeSerializer that = (CoderTypeSerializer) o;
-		return coder.equals(that.coder);
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof CoderTypeSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return coder.hashCode();
-	}
+  
+  private Coder<T> coder;
+  private transient DataInputViewWrapper inputWrapper;
+  private transient DataOutputViewWrapper outputWrapper;
+
+  // We use this for internal encoding/decoding for creating copies using the Coder.
+  private transient InspectableByteArrayOutputStream buffer;
+
+  public CoderTypeSerializer(Coder<T> coder) {
+    this.coder = coder;
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public CoderTypeSerializer<T> duplicate() {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public T createInstance() {
+    return null;
+  }
+
+  @Override
+  public T copy(T t) {
+    buffer.reset();
+    try {
+      coder.encode(t, buffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+    try {
+      return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer
+          .size()), Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+  }
+
+  @Override
+  public T copy(T t, T reuse) {
+    return copy(t);
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(T t, DataOutputView dataOutputView) throws IOException {
+    outputWrapper.setOutputView(dataOutputView);
+    coder.encode(t, outputWrapper, Coder.Context.NESTED);
+  }
+
+  @Override
+  public T deserialize(DataInputView dataInputView) throws IOException {
+    try {
+      inputWrapper.setInputView(dataInputView);
+      return coder.decode(inputWrapper, Coder.Context.NESTED);
+    } catch (CoderException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof EOFException) {
+        throw (EOFException) cause;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public T deserialize(T t, DataInputView dataInputView) throws IOException {
+    return deserialize(dataInputView);
+  }
+
+  @Override
+  public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
+    serialize(deserialize(dataInputView), dataOutputView);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeSerializer that = (CoderTypeSerializer) o;
+    return coder.equals(that.coder);
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
index 5d918cc..be6eadd 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
@@ -25,10 +25,10 @@ import java.io.ByteArrayOutputStream;
  */
 public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
 
-	/**
-	 * Get the underlying byte array.
-	 */
-	public byte[] getBuffer() {
-		return buf;
-	}
+  /**
+   * Get the underlying byte array.
+   */
+  public byte[] getBuffer() {
+    return buf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
index 815569d..ba09ea9 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
@@ -35,230 +35,230 @@ import java.io.ObjectInputStream;
  * for {@link KV} that always compares on the key only.
  */
 public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
-	
-	private KvCoder<K, V> coder;
-	private Coder<K> keyCoder;
-
-	// We use these for internal encoding/decoding for creating copies and comparing
-	// serialized forms using a Coder
-	private transient InspectableByteArrayOutputStream buffer1;
-	private transient InspectableByteArrayOutputStream buffer2;
-
-	// For storing the Reference in encoded form
-	private transient InspectableByteArrayOutputStream referenceBuffer;
-
-
-	// For deserializing the key
-	private transient DataInputViewWrapper inputWrapper;
-
-	public KvCoderComperator(KvCoder<K, V> coder) {
-		this.coder = coder;
-		this.keyCoder = coder.getKeyCoder();
-
-		buffer1 = new InspectableByteArrayOutputStream();
-		buffer2 = new InspectableByteArrayOutputStream();
-		referenceBuffer = new InspectableByteArrayOutputStream();
-
-		inputWrapper = new DataInputViewWrapper(null);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		buffer1 = new InspectableByteArrayOutputStream();
-		buffer2 = new InspectableByteArrayOutputStream();
-		referenceBuffer = new InspectableByteArrayOutputStream();
-
-		inputWrapper = new DataInputViewWrapper(null);
-	}
-
-	@Override
-	public int hash(KV<K, V> record) {
-		K key = record.getKey();
-		if (key != null) {
-			return key.hashCode();
-		} else {
-			return 0;
-		}
-	}
-
-	@Override
-	public void setReference(KV<K, V> toCompare) {
-		referenceBuffer.reset();
-		try {
-			keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-		}
-	}
-
-	@Override
-	public boolean equalToReference(KV<K, V> candidate) {
-		try {
-			buffer2.reset();
-			keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
-			byte[] arr = referenceBuffer.getBuffer();
-			byte[] arrOther = buffer2.getBuffer();
-			if (referenceBuffer.size() != buffer2.size()) {
-				return false;
-			}
-			int len = buffer2.size();
-			for(int i = 0; i < len; i++ ) {
-				if (arr[i] != arrOther[i]) {
-					return false;
-				}
-			}
-			return true;
-		} catch (IOException e) {
-			throw new RuntimeException("Could not compare reference.", e);
-		}
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<KV<K, V>> other) {
-		InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer;
-
-		byte[] arr = referenceBuffer.getBuffer();
-		byte[] arrOther = otherReferenceBuffer.getBuffer();
-		if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-			return referenceBuffer.size() - otherReferenceBuffer.size();
-		}
-		int len = referenceBuffer.size();
-		for (int i = 0; i < len; i++) {
-			if (arr[i] != arrOther[i]) {
-				return arr[i] - arrOther[i];
-			}
-		}
-		return 0;
-	}
-
-
-	@Override
-	public int compare(KV<K, V> first, KV<K, V> second) {
-		try {
-			buffer1.reset();
-			buffer2.reset();
-			keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
-			keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
-			byte[] arr = buffer1.getBuffer();
-			byte[] arrOther = buffer2.getBuffer();
-			if (buffer1.size() != buffer2.size()) {
-				return buffer1.size() - buffer2.size();
-			}
-			int len = buffer1.size();
-			for(int i = 0; i < len; i++ ) {
-				if (arr[i] != arrOther[i]) {
-					return arr[i] - arrOther[i];
-				}
-			}
-			return 0;
-		} catch (IOException e) {
-			throw new RuntimeException("Could not compare reference.", e);
-		}
-	}
-
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-
-		inputWrapper.setInputView(firstSource);
-		K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
-		inputWrapper.setInputView(secondSource);
-		K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
-
-		try {
-			buffer1.reset();
-			buffer2.reset();
-			keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
-			keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
-			byte[] arr = buffer1.getBuffer();
-			byte[] arrOther = buffer2.getBuffer();
-			if (buffer1.size() != buffer2.size()) {
-				return buffer1.size() - buffer2.size();
-			}
-			int len = buffer1.size();
-			for(int i = 0; i < len; i++ ) {
-				if (arr[i] != arrOther[i]) {
-					return arr[i] - arrOther[i];
-				}
-			}
-			return 0;
-		} catch (IOException e) {
-			throw new RuntimeException("Could not compare reference.", e);
-		}
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return true;
-	}
-
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return Integer.MAX_VALUE;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return true;
-	}
-
-	@Override
-	public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) {
-		buffer1.reset();
-		try {
-			keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
-		}
-		final byte[] data = buffer1.getBuffer();
-		final int limit = offset + numBytes;
-
-		int numBytesPut = Math.min(numBytes, buffer1.size());
-
-		target.put(offset, data, 0, numBytesPut);
-
-		offset += numBytesPut;
-
-		while (offset < limit) {
-			target.put(offset++, (byte) 0);
-		}
-	}
-
-	@Override
-	public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return false;
-	}
-
-	@Override
-	public TypeComparator<KV<K, V>> duplicate() {
-		return new KvCoderComperator<>(coder);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		KV<K, V> kv = (KV<K, V>) record;
-		K k = kv.getKey();
-		target[index] = k;
-		return 1;
-	}
-
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return new TypeComparator[] {new CoderComparator<>(keyCoder)};
-	}
+  
+  private KvCoder<K, V> coder;
+  private Coder<K> keyCoder;
+
+  // We use these for internal encoding/decoding for creating copies and comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+
+  // For deserializing the key
+  private transient DataInputViewWrapper inputWrapper;
+
+  public KvCoderComperator(KvCoder<K, V> coder) {
+    this.coder = coder;
+    this.keyCoder = coder.getKeyCoder();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  @Override
+  public int hash(KV<K, V> record) {
+    K key = record.getKey();
+    if (key != null) {
+      return key.hashCode();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void setReference(KV<K, V> toCompare) {
+    referenceBuffer.reset();
+    try {
+      keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(KV<K, V> candidate) {
+    try {
+      buffer2.reset();
+      keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<KV<K, V>> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+
+  @Override
+  public int compare(KV<K, V> first, KV<K, V> second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
+      keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+
+    inputWrapper.setInputView(firstSource);
+    K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+    inputWrapper.setInputView(secondSource);
+    K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
+      keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) {
+    buffer1.reset();
+    try {
+      keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    int numBytesPut = Math.min(numBytes, buffer1.size());
+
+    target.put(offset, data, 0, numBytesPut);
+
+    offset += numBytesPut;
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<KV<K, V>> duplicate() {
+    return new KvCoderComperator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    KV<K, V> kv = (KV<K, V>) record;
+    K k = kv.getKey();
+    target[index] = k;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] {new CoderComparator<>(keyCoder)};
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
index 090f79d..be11918 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
@@ -34,153 +34,153 @@ import java.util.List;
  */
 public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
 
-	private KvCoder<K, V> coder;
-
-	// We don't have the Class, so we have to pass null here. What a shame...
-	private static Object DUMMY = new Object();
-
-	@SuppressWarnings("unchecked")
-	public KvCoderTypeInformation(KvCoder<K, V> coder) {
-		super(((Class<KV<K,V>>) DUMMY.getClass()));
-		this.coder = coder;
-		Preconditions.checkNotNull(coder);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
-		return new KvCoderComperator((KvCoder) coder);
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 2;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public Class<KV<K, V>> getTypeClass() {
-		return privateGetTypeClass();
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <X> Class<X> privateGetTypeClass() {
-		return (Class<X>) Object.class;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return true;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
-		return new CoderTypeSerializer<>(coder);
-	}
-
-	@Override
-	public int getTotalFields() {
-		return 2;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) return true;
-		if (o == null || getClass() != o.getClass()) return false;
-
-		KvCoderTypeInformation that = (KvCoderTypeInformation) o;
-
-		return coder.equals(that.coder);
-
-	}
-
-	@Override
-	public int hashCode() {
-		return coder.hashCode();
-	}
-
-	@Override
-	public String toString() {
-		return "CoderTypeInformation{" +
-				"coder=" + coder +
-				'}';
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public <X> TypeInformation<X> getTypeAt(int pos) {
-		if (pos == 0) {
-			return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
-		} else if (pos == 1) {
-			return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
-		} else {
-			throw new RuntimeException("Invalid field position " + pos);
-		}
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-		switch (fieldExpression) {
-			case "key":
-				return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
-			case "value":
-				return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
-			default:
-				throw new UnsupportedOperationException("Only KvCoder has fields.");
-		}
-	}
-
-	@Override
-	public String[] getFieldNames() {
-		return new String[]{"key", "value"};
-	}
-
-	@Override
-	public int getFieldIndex(String fieldName) {
-		switch (fieldName) {
-			case "key":
-				return 0;
-			case "value":
-				return 1;
-			default:
-				return -1;
-		}
-	}
-
-	@Override
-	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
-			CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder());
-			result.add(new FlatFieldDescriptor(0, keyTypeInfo));
-	}
-
-	@Override
-	protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
-		return new KvCoderTypeComparatorBuilder();
-	}
-
-	private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> {
-
-		@Override
-		public void initializeTypeComparatorBuilder(int size) {}
-
-		@Override
-		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
-
-		@Override
-		public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) {
-			return new KvCoderComperator<>(coder);
-		}
-	}
+  private KvCoder<K, V> coder;
+
+  // We don't have the Class, so we have to pass null here. What a shame...
+  private static Object DUMMY = new Object();
+
+  @SuppressWarnings("unchecked")
+  public KvCoderTypeInformation(KvCoder<K, V> coder) {
+    super(((Class<KV<K,V>>) DUMMY.getClass()));
+    this.coder = coder;
+    Preconditions.checkNotNull(coder);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
+    return new KvCoderComperator((KvCoder) coder);
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 2;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<KV<K, V>> getTypeClass() {
+    return privateGetTypeClass();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <X> Class<X> privateGetTypeClass() {
+    return (Class<X>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    KvCoderTypeInformation that = (KvCoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(int pos) {
+    if (pos == 0) {
+      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+    } else if (pos == 1) {
+      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+    } else {
+      throw new RuntimeException("Invalid field position " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+    switch (fieldExpression) {
+      case "key":
+        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+      case "value":
+        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+      default:
+        throw new UnsupportedOperationException("Only KvCoder has fields.");
+    }
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return new String[]{"key", "value"};
+  }
+
+  @Override
+  public int getFieldIndex(String fieldName) {
+    switch (fieldName) {
+      case "key":
+        return 0;
+      case "value":
+        return 1;
+      default:
+        return -1;
+    }
+  }
+
+  @Override
+  public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
+      CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder());
+      result.add(new FlatFieldDescriptor(0, keyTypeInfo));
+  }
+
+  @Override
+  protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
+    return new KvCoderTypeComparatorBuilder();
+  }
+
+  private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> {
+
+    @Override
+    public void initializeTypeComparatorBuilder(int size) {}
+
+    @Override
+    public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
+
+    @Override
+    public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) {
+      return new KvCoderComperator<>(coder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
index 7ce484a..190d898 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
@@ -31,82 +31,82 @@ import java.io.IOException;
  */
 public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> {
 
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public VoidCoderTypeSerializer duplicate() {
-		return this;
-	}
-
-	@Override
-	public VoidValue createInstance() {
-		return VoidValue.INSTANCE;
-	}
-
-	@Override
-	public VoidValue copy(VoidValue from) {
-		return from;
-	}
-
-	@Override
-	public VoidValue copy(VoidValue from, VoidValue reuse) {
-		return from;
-	}
-
-	@Override
-	public int getLength() {
-		return 0;
-	}
-
-	@Override
-	public void serialize(VoidValue record, DataOutputView target) throws IOException {
-		target.writeByte(1);
-	}
-
-	@Override
-	public VoidValue deserialize(DataInputView source) throws IOException {
-		source.readByte();
-		return VoidValue.INSTANCE;
-	}
-
-	@Override
-	public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException {
-		return deserialize(source);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		source.readByte();
-		target.writeByte(1);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof VoidCoderTypeSerializer) {
-			VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
-			return other.canEqual(this);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof VoidCoderTypeSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return 0;
-	}
-
-	public static class VoidValue {
-		private VoidValue() {}
-		
-		public static VoidValue INSTANCE = new VoidValue();
-	}
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public VoidCoderTypeSerializer duplicate() {
+    return this;
+  }
+
+  @Override
+  public VoidValue createInstance() {
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from) {
+    return from;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from, VoidValue reuse) {
+    return from;
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(VoidValue record, DataOutputView target) throws IOException {
+    target.writeByte(1);
+  }
+
+  @Override
+  public VoidValue deserialize(DataInputView source) throws IOException {
+    source.readByte();
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException {
+    return deserialize(source);
+  }
+
+  @Override
+  public void copy(DataInputView source, DataOutputView target) throws IOException {
+    source.readByte();
+    target.writeByte(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof VoidCoderTypeSerializer) {
+      VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
+      return other.canEqual(this);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof VoidCoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public static class VoidValue {
+    private VoidValue() {}
+    
+    public static VoidValue INSTANCE = new VoidValue();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
index 924b297..8f6d67c 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
@@ -31,62 +31,62 @@ import java.io.Serializable;
  * operation.
  */
 public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> {
-	
-	private AA aa;
-	private Combine.CombineFn<? super AI, AA, AR> combiner;
+  
+  private AA aa;
+  private Combine.CombineFn<? super AI, AA, AR> combiner;
 
-	public CombineFnAggregatorWrapper() {
-	}
+  public CombineFnAggregatorWrapper() {
+  }
 
-	public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) {
-		this.combiner = combiner;
-		this.aa = combiner.createAccumulator();
-	}
+  public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) {
+    this.combiner = combiner;
+    this.aa = combiner.createAccumulator();
+  }
 
-	@Override
-	public void add(AI value) {
-		combiner.addInput(aa, value);
-	}
+  @Override
+  public void add(AI value) {
+    combiner.addInput(aa, value);
+  }
 
-	@Override
-	public Serializable getLocalValue() {
-		return (Serializable) combiner.extractOutput(aa);
-	}
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) combiner.extractOutput(aa);
+  }
 
-	@Override
-	public void resetLocal() {
-		aa = combiner.createAccumulator();
-	}
+  @Override
+  public void resetLocal() {
+    aa = combiner.createAccumulator();
+  }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public void merge(Accumulator<AI, Serializable> other) {
-		aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
-	}
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
+  }
 
-	@Override
-	public Accumulator<AI, Serializable> clone() {
-		// copy it by merging
-		AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
-		CombineFnAggregatorWrapper<AI, AA, AR> result = new
-				CombineFnAggregatorWrapper<>(combiner);
-		result.aa = aaCopy;
-		return result;
-	}
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
+    CombineFnAggregatorWrapper<AI, AA, AR> result = new
+        CombineFnAggregatorWrapper<>(combiner);
+    result.aa = aaCopy;
+    return result;
+  }
 
-	@Override
-	public void addValue(AI value) {
-		add(value);
-	}
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
 
-	@Override
-	public String getName() {
-		return "CombineFn: " + combiner.toString();
-	}
+  @Override
+  public String getName() {
+    return "CombineFn: " + combiner.toString();
+  }
 
-	@Override
-	public Combine.CombineFn getCombineFn() {
-		return combiner;
-	}
+  @Override
+  public Combine.CombineFn getCombineFn() {
+    return combiner;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
index 90582b0..3c96939 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
@@ -31,29 +31,29 @@ import java.io.InputStream;
  */
 public class DataInputViewWrapper extends InputStream {
 
-	private DataInputView inputView;
-
-	public DataInputViewWrapper(DataInputView inputView) {
-		this.inputView = inputView;
-	}
-
-	public void setInputView(DataInputView inputView) {
-		this.inputView = inputView;
-	}
-
-	@Override
-	public int read() throws IOException {
-		try {
-			return inputView.readUnsignedByte();
-		} catch (EOFException e) {
-			// translate between DataInput and InputStream,
-			// DataInput signals EOF by exception, InputStream does it by returning -1
-			return -1;
-		}
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		return inputView.read(b, off, len);
-	}
+  private DataInputView inputView;
+
+  public DataInputViewWrapper(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  public void setInputView(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  @Override
+  public int read() throws IOException {
+    try {
+      return inputView.readUnsignedByte();
+    } catch (EOFException e) {
+      // translate between DataInput and InputStream,
+      // DataInput signals EOF by exception, InputStream does it by returning -1
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return inputView.read(b, off, len);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
index 46df8e5..a222cdd 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
@@ -29,24 +29,24 @@ import java.io.OutputStream;
  * {@link java.io.OutputStream}.
  */
 public class DataOutputViewWrapper extends OutputStream {
-	
-	private DataOutputView outputView;
+  
+  private DataOutputView outputView;
 
-	public DataOutputViewWrapper(DataOutputView outputView) {
-		this.outputView = outputView;
-	}
+  public DataOutputViewWrapper(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
 
-	public void setOutputView(DataOutputView outputView) {
-		this.outputView = outputView;
-	}
+  public void setOutputView(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
 
-	@Override
-	public void write(int b) throws IOException {
-		outputView.write(b);
-	}
+  @Override
+  public void write(int b) throws IOException {
+    outputView.write(b);
+  }
 
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		outputView.write(b, off, len);
-	}
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputView.write(b, off, len);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
index 1c0dae4..c193a4d 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -33,59 +33,59 @@ import java.io.Serializable;
  */
 public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> {
 
-	private AO aa;
-	private Combine.CombineFn<AI, ?, AO> combiner;
+  private AO aa;
+  private Combine.CombineFn<AI, ?, AO> combiner;
 
-	public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) {
-		this.combiner = combiner;
-		resetLocal();
-	}
-	
-	@Override
-	@SuppressWarnings("unchecked")
-	public void add(AI value) {
-		this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
-	}
+  public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) {
+    this.combiner = combiner;
+    resetLocal();
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  public void add(AI value) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
+  }
 
-	@Override
-	public Serializable getLocalValue() {
-		return (Serializable) aa;
-	}
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) aa;
+  }
 
-	@Override
-	public void resetLocal() {
-		this.aa = combiner.apply(ImmutableList.<AI>of());
-	}
+  @Override
+  public void resetLocal() {
+    this.aa = combiner.apply(ImmutableList.<AI>of());
+  }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public void merge(Accumulator<AI, Serializable> other) {
-		this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue()));
-	}
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue()));
+  }
 
-	@Override
-	public void addValue(AI value) {
-		add(value);
-	}
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
 
-	@Override
-	public String getName() {
-		return "Aggregator :" + combiner.toString();
-	}
+  @Override
+  public String getName() {
+    return "Aggregator :" + combiner.toString();
+  }
 
-	@Override
-	public Combine.CombineFn<AI, ?, AO> getCombineFn() {
-		return combiner;
-	}
+  @Override
+  public Combine.CombineFn<AI, ?, AO> getCombineFn() {
+    return combiner;
+  }
 
-	@Override
-	public Accumulator<AI, Serializable> clone() {
-		// copy it by merging
-		AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
-		SerializableFnAggregatorWrapper<AI, AO> result = new
-				SerializableFnAggregatorWrapper<>(combiner);
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
+    SerializableFnAggregatorWrapper<AI, AO> result = new
+        SerializableFnAggregatorWrapper<>(combiner);
 
-		result.aa = resultCopy;
-		return result;
-	}
+    result.aa = resultCopy;
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
index 8be9abf..3f28c16 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
@@ -38,84 +38,84 @@ import java.lang.reflect.Field;
  */
 public class SinkOutputFormat<T> implements OutputFormat<T> {
 
-	private final Sink<T> sink;
-
-	private transient PipelineOptions pipelineOptions;
-
-	private Sink.WriteOperation<T, ?> writeOperation;
-	private Sink.Writer<T, ?> writer;
-
-	private AbstractID uid = new AbstractID();
-
-	public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
-		this.sink = extractSink(transform);
-		this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
-	}
-
-	private Sink<T> extractSink(Write.Bound<T> transform) {
-		// TODO possibly add a getter in the upstream
-		try {
-			Field sinkField = transform.getClass().getDeclaredField("sink");
-			sinkField.setAccessible(true);
-			@SuppressWarnings("unchecked")
-			Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
-			return extractedSink;
-		} catch (NoSuchFieldException | IllegalAccessException e) {
-			throw new RuntimeException("Could not acquire custom sink field.", e);
-		}
-	}
-
-	@Override
-	public void configure(Configuration configuration) {
-		writeOperation = sink.createWriteOperation(pipelineOptions);
-		try {
-			writeOperation.initialize(pipelineOptions);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to initialize the write operation.", e);
-		}
-	}
-
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		try {
-			writer = writeOperation.createWriter(pipelineOptions);
-		} catch (Exception e) {
-			throw new IOException("Couldn't create writer.", e);
-		}
-		try {
-			writer.open(uid + "-" + String.valueOf(taskNumber));
-		} catch (Exception e) {
-			throw new IOException("Couldn't open writer.", e);
-		}
-	}
-
-	@Override
-	public void writeRecord(T record) throws IOException {
-		try {
-			writer.write(record);
-		} catch (Exception e) {
-			throw new IOException("Couldn't write record.", e);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		try {
-			writer.close();
-		} catch (Exception e) {
-			throw new IOException("Couldn't close writer.", e);
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		ObjectMapper mapper = new ObjectMapper();
-		mapper.writeValue(out, pipelineOptions);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		ObjectMapper mapper = new ObjectMapper();
-		pipelineOptions = mapper.readValue(in, PipelineOptions.class);
-	}
+  private final Sink<T> sink;
+
+  private transient PipelineOptions pipelineOptions;
+
+  private Sink.WriteOperation<T, ?> writeOperation;
+  private Sink.Writer<T, ?> writer;
+
+  private AbstractID uid = new AbstractID();
+
+  public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
+    this.sink = extractSink(transform);
+    this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
+  }
+
+  private Sink<T> extractSink(Write.Bound<T> transform) {
+    // TODO possibly add a getter in the upstream
+    try {
+      Field sinkField = transform.getClass().getDeclaredField("sink");
+      sinkField.setAccessible(true);
+      @SuppressWarnings("unchecked")
+      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
+      return extractedSink;
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException("Could not acquire custom sink field.", e);
+    }
+  }
+
+  @Override
+  public void configure(Configuration configuration) {
+    writeOperation = sink.createWriteOperation(pipelineOptions);
+    try {
+      writeOperation.initialize(pipelineOptions);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize the write operation.", e);
+    }
+  }
+
+  @Override
+  public void open(int taskNumber, int numTasks) throws IOException {
+    try {
+      writer = writeOperation.createWriter(pipelineOptions);
+    } catch (Exception e) {
+      throw new IOException("Couldn't create writer.", e);
+    }
+    try {
+      writer.open(uid + "-" + String.valueOf(taskNumber));
+    } catch (Exception e) {
+      throw new IOException("Couldn't open writer.", e);
+    }
+  }
+
+  @Override
+  public void writeRecord(T record) throws IOException {
+    try {
+      writer.write(record);
+    } catch (Exception e) {
+      throw new IOException("Couldn't write record.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } catch (Exception e) {
+      throw new IOException("Couldn't close writer.", e);
+    }
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, pipelineOptions);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
index 64dc072..5981618 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
@@ -41,124 +41,124 @@ import java.util.List;
  * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
  */
 public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> {
-	private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
-
-	private final BoundedSource<T> initialSource;
-	private transient PipelineOptions options;
-
-	private BoundedSource.BoundedReader<T> reader = null;
-	private boolean reachedEnd = true;
-
-	public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
-		this.initialSource = initialSource;
-		this.options = options;
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		ObjectMapper mapper = new ObjectMapper();
-		mapper.writeValue(out, options);
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		ObjectMapper mapper = new ObjectMapper();
-		options = mapper.readValue(in, PipelineOptions.class);
-	}
-
-	@Override
-	public void configure(Configuration configuration) {}
-
-	@Override
-	public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
-		reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
-		reachedEnd = false;
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
-		try {
-			final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-
-			return new BaseStatistics() {
-				@Override
-				public long getTotalInputSize() {
-					return estimatedSize;
-
-				}
-
-				@Override
-				public long getNumberOfRecords() {
-					return BaseStatistics.NUM_RECORDS_UNKNOWN;
-				}
-
-				@Override
-				public float getAverageRecordWidth() {
-					return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
-				}
-			};
-		} catch (Exception e) {
-			LOG.warn("Could not read Source statistics: {}", e);
-		}
-
-		return null;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
-		long desiredSizeBytes;
-		try {
-			desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
-			List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
-					options);
-			List<SourceInputSplit<T>> splits = new ArrayList<>();
-			int splitCount = 0;
-			for (Source<T> shard: shards) {
-				splits.add(new SourceInputSplit<>(shard, splitCount++));
-			}
-			return splits.toArray(new SourceInputSplit[splits.size()]);
-		} catch (Exception e) {
-			throw new IOException("Could not create input splits from Source.", e);
-		}
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
-		return new InputSplitAssigner() {
-			private int index = 0;
-			private final SourceInputSplit[] splits = sourceInputSplits;
-			@Override
-			public InputSplit getNextInputSplit(String host, int taskId) {
-				if (index < splits.length) {
-					return splits[index++];
-				} else {
-					return null;
-				}
-			}
-		};
-	}
-
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return reachedEnd;
-	}
-
-	@Override
-	public T nextRecord(T t) throws IOException {
-
-		reachedEnd = !reader.advance();
-		if (!reachedEnd) {
-			return reader.getCurrent();
-		}
-		return null;
-	}
-
-	@Override
-	public void close() throws IOException {
-		reader.close();
-	}
+  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
+
+  private final BoundedSource<T> initialSource;
+  private transient PipelineOptions options;
+
+  private BoundedSource.BoundedReader<T> reader = null;
+  private boolean reachedEnd = true;
+
+  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+    this.initialSource = initialSource;
+    this.options = options;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+  }
+
+  @Override
+  public void configure(Configuration configuration) {}
+
+  @Override
+  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
+    reachedEnd = false;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
+    try {
+      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+
+      return new BaseStatistics() {
+        @Override
+        public long getTotalInputSize() {
+          return estimatedSize;
+
+        }
+
+        @Override
+        public long getNumberOfRecords() {
+          return BaseStatistics.NUM_RECORDS_UNKNOWN;
+        }
+
+        @Override
+        public float getAverageRecordWidth() {
+          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
+        }
+      };
+    } catch (Exception e) {
+      LOG.warn("Could not read Source statistics: {}", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
+    long desiredSizeBytes;
+    try {
+      desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
+      List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
+          options);
+      List<SourceInputSplit<T>> splits = new ArrayList<>();
+      int splitCount = 0;
+      for (Source<T> shard: shards) {
+        splits.add(new SourceInputSplit<>(shard, splitCount++));
+      }
+      return splits.toArray(new SourceInputSplit[splits.size()]);
+    } catch (Exception e) {
+      throw new IOException("Could not create input splits from Source.", e);
+    }
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
+    return new InputSplitAssigner() {
+      private int index = 0;
+      private final SourceInputSplit[] splits = sourceInputSplits;
+      @Override
+      public InputSplit getNextInputSplit(String host, int taskId) {
+        if (index < splits.length) {
+          return splits[index++];
+        } else {
+          return null;
+        }
+      }
+    };
+  }
+
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return reachedEnd;
+  }
+
+  @Override
+  public T nextRecord(T t) throws IOException {
+
+    reachedEnd = !reader.advance();
+    if (!reachedEnd) {
+      return reader.getCurrent();
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
index 2b93ab7..86fdada 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
@@ -29,24 +29,24 @@ import org.apache.flink.core.io.InputSplit;
  */
 public class SourceInputSplit<T> implements InputSplit {
 
-	private Source<T> source;
-	private int splitNumber;
+  private Source<T> source;
+  private int splitNumber;
 
-	public SourceInputSplit() {
-	}
+  public SourceInputSplit() {
+  }
 
-	public SourceInputSplit(Source<T> source, int splitNumber) {
-		this.source = source;
-		this.splitNumber = splitNumber;
-	}
+  public SourceInputSplit(Source<T> source, int splitNumber) {
+    this.source = source;
+    this.splitNumber = splitNumber;
+  }
 
-	@Override
-	public int getSplitNumber() {
-		return splitNumber;
-	}
+  @Override
+  public int getSplitNumber() {
+    return splitNumber;
+  }
 
-	public Source<T> getSource() {
-		return source;
-	}
+  public Source<T> getSource() {
+    return source;
+  }
 
 }


Mime
View raw message