beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [07/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Date Thu, 20 Jul 2017 17:09:34 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 482f5a2..b69bc83 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -83,7 +83,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -365,35 +364,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testUnreachableKafkaBrokers() {
-    // Expect an exception when the Kafka brokers are not reachable on the workers.
-    // We specify partitions explicitly so that splitting does not involve server interaction.
-    // Set request timeout to 10ms so that test does not take long.
-
-    thrown.expect(Exception.class);
-    thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");
-
-    int numElements = 1000;
-    PCollection<Long> input = p
-        .apply(KafkaIO.<Integer, Long>read()
-            .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
-            .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
-            .withKeyDeserializer(IntegerDeserializer.class)
-            .withValueDeserializer(LongDeserializer.class)
-            .updateConsumerProperties(ImmutableMap.<String, Object>of(
-                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
-                ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
-                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
-                ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
-            .withMaxNumRecords(10)
-            .withoutMetadata())
-        .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index 46d5e26..cb7064b 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 2629c57..919d85a 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import java.io.Serializable;
 
 /**
@@ -24,7 +25,6 @@ import java.io.Serializable;
  * How exactly the checkpoint is generated is up to implementing class.
  */
 interface CheckpointGenerator extends Serializable {
-
-  KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
-      throws TransientKinesisException;
+    KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+            throws TransientKinesisException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 5a28214..4bed0e3 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,79 +24,76 @@ import java.util.Objects;
  * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
  */
 abstract class CustomOptional<T> {
+    @SuppressWarnings("unchecked")
+    public static <T> CustomOptional<T> absent() {
+        return (Absent<T>) Absent.INSTANCE;
+    }
 
-  @SuppressWarnings("unchecked")
-  public static <T> CustomOptional<T> absent() {
-    return (Absent<T>) Absent.INSTANCE;
-  }
+    public static <T> CustomOptional<T> of(T v) {
+        return new Present<>(v);
+    }
 
-  public static <T> CustomOptional<T> of(T v) {
-    return new Present<>(v);
-  }
+    public abstract boolean isPresent();
 
-  public abstract boolean isPresent();
+    public abstract T get();
 
-  public abstract T get();
+    private static class Present<T> extends CustomOptional<T> {
+        private final T value;
 
-  private static class Present<T> extends CustomOptional<T> {
+        private Present(T value) {
+            this.value = value;
+        }
 
-    private final T value;
+        @Override
+        public boolean isPresent() {
+            return true;
+        }
 
-    private Present(T value) {
-      this.value = value;
-    }
+        @Override
+        public T get() {
+            return value;
+        }
 
-    @Override
-    public boolean isPresent() {
-      return true;
-    }
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Present)) {
+                return false;
+            }
 
-    @Override
-    public T get() {
-      return value;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Present)) {
-        return false;
-      }
+            Present<?> present = (Present<?>) o;
+            return Objects.equals(value, present.value);
+        }
 
-      Present<?> present = (Present<?>) o;
-      return Objects.equals(value, present.value);
+        @Override
+        public int hashCode() {
+            return Objects.hash(value);
+        }
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(value);
-    }
-  }
+    private static class Absent<T> extends CustomOptional<T> {
+        private static final Absent<Object> INSTANCE = new Absent<>();
 
-  private static class Absent<T> extends CustomOptional<T> {
+        private Absent() {
+        }
 
-    private static final Absent<Object> INSTANCE = new Absent<>();
+        @Override
+        public boolean isPresent() {
+            return false;
+        }
 
-    private Absent() {
-    }
-
-    @Override
-    public boolean isPresent() {
-      return false;
-    }
+        @Override
+        public T get() {
+            throw new NoSuchElementException();
+        }
 
-    @Override
-    public T get() {
-      throw new NoSuchElementException();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      return o instanceof Absent;
-    }
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof Absent;
+        }
 
-    @Override
-    public int hashCode() {
-      return 0;
+        @Override
+        public int hashCode() {
+            return 0;
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 9933019..2ec293c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -28,31 +28,29 @@ import com.google.common.base.Function;
  * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
  */
 class DynamicCheckpointGenerator implements CheckpointGenerator {
-
-  private final String streamName;
-  private final StartingPoint startingPoint;
-
-  public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
-    this.streamName = checkNotNull(streamName, "streamName");
-    this.startingPoint = checkNotNull(startingPoint, "startingPoint");
-  }
-
-  @Override
-  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
-      throws TransientKinesisException {
-    return new KinesisReaderCheckpoint(
-        transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
-
-          @Override
-          public ShardCheckpoint apply(Shard shard) {
-            return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
-          }
-        })
-    );
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
-  }
+    private final String streamName;
+    private final StartingPoint startingPoint;
+
+    public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+        this.streamName = checkNotNull(streamName, "streamName");
+        this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+    }
+
+    @Override
+    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+            throws TransientKinesisException {
+        return new KinesisReaderCheckpoint(
+                transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+                    @Override
+                    public ShardCheckpoint apply(Shard shard) {
+                        return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+                    }
+                })
+        );
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index f605f55..5a34d7d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.transform;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Function;
-
 import java.util.List;
 import javax.annotation.Nullable;
 
@@ -29,29 +28,27 @@ import javax.annotation.Nullable;
  * Represents the output of 'get' operation on Kinesis stream.
  */
 class GetKinesisRecordsResult {
-
-  private final List<KinesisRecord> records;
-  private final String nextShardIterator;
-
-  public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
-      final String streamName, final String shardId) {
-    this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
-
-      @Nullable
-      @Override
-      public KinesisRecord apply(@Nullable UserRecord input) {
-        assert input != null;  // to make FindBugs happy
-        return new KinesisRecord(input, streamName, shardId);
-      }
-    });
-    this.nextShardIterator = nextShardIterator;
-  }
-
-  public List<KinesisRecord> getRecords() {
-    return records;
-  }
-
-  public String getNextShardIterator() {
-    return nextShardIterator;
-  }
+    private final List<KinesisRecord> records;
+    private final String nextShardIterator;
+
+    public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+                                   final String streamName, final String shardId) {
+        this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+            @Nullable
+            @Override
+            public KinesisRecord apply(@Nullable UserRecord input) {
+                assert input != null;  // to make FindBugs happy
+                return new KinesisRecord(input, streamName, shardId);
+            }
+        });
+        this.nextShardIterator = nextShardIterator;
+    }
+
+    public List<KinesisRecord> getRecords() {
+        return records;
+    }
+
+    public String getNextShardIterator() {
+        return nextShardIterator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
index b5b721e..c7fd7f6 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
-
 import java.io.Serializable;
 
 /**
@@ -28,6 +27,5 @@ import java.io.Serializable;
  * {@link Serializable} to ensure it can be sent to worker machines.
  */
 interface KinesisClientProvider extends Serializable {
-
-  AmazonKinesis get();
+    AmazonKinesis get();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index bc8ada1..c97316d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -28,9 +29,7 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.auto.value.AutoValue;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -101,150 +100,144 @@ import org.joda.time.Instant;
  * }</pre>
  *
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public final class KinesisIO {
-
-  /** Returns a new {@link Read} transform for reading from Kinesis. */
-  public static Read read() {
-    return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
-  }
-
-  /** Implementation of {@link #read}. */
-  @AutoValue
-  public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
-
-    @Nullable
-    abstract String getStreamName();
-
-    @Nullable
-    abstract StartingPoint getInitialPosition();
-
-    @Nullable
-    abstract KinesisClientProvider getClientProvider();
-
-    abstract int getMaxNumRecords();
-
-    @Nullable
-    abstract Duration getMaxReadTime();
-
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-
-      abstract Builder setStreamName(String streamName);
-
-      abstract Builder setInitialPosition(StartingPoint startingPoint);
-
-      abstract Builder setClientProvider(KinesisClientProvider clientProvider);
-
-      abstract Builder setMaxNumRecords(int maxNumRecords);
-
-      abstract Builder setMaxReadTime(Duration maxReadTime);
-
-      abstract Read build();
+    /** Returns a new {@link Read} transform for reading from Kinesis. */
+    public static Read read() {
+        return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
     }
 
-    /**
-     * Specify reading from streamName at some initial position.
-     */
-    public Read from(String streamName, InitialPositionInStream initialPosition) {
-      return toBuilder()
-          .setStreamName(streamName)
-          .setInitialPosition(
-              new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
-          .build();
-    }
-
-    /**
-     * Specify reading from streamName beginning at given {@link Instant}.
-     * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
-     */
-    public Read from(String streamName, Instant initialTimestamp) {
-      return toBuilder()
-          .setStreamName(streamName)
-          .setInitialPosition(
-              new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
-          .build();
-    }
-
-    /**
-     * Allows to specify custom {@link KinesisClientProvider}.
-     * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
-     * used for communication with Kinesis.
-     * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
-     * does not suit your needs.
-     */
-    public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
-      return toBuilder().setClientProvider(kinesisClientProvider).build();
-    }
-
-    /**
-     * Specify credential details and region to be used to read from Kinesis.
-     * If you need more sophisticated credential protocol, then you should look at
-     * {@link Read#withClientProvider(KinesisClientProvider)}.
-     */
-    public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
-      return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
-    }
-
-    /** Specifies to read at most a given number of records. */
-    public Read withMaxNumRecords(int maxNumRecords) {
-      checkArgument(
-          maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
-      return toBuilder().setMaxNumRecords(maxNumRecords).build();
-    }
-
-    /** Specifies to read at most a given number of records. */
-    public Read withMaxReadTime(Duration maxReadTime) {
-      checkNotNull(maxReadTime, "maxReadTime");
-      return toBuilder().setMaxReadTime(maxReadTime).build();
-    }
-
-    @Override
-    public PCollection<KinesisRecord> expand(PBegin input) {
-      org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
-          org.apache.beam.sdk.io.Read.from(
-              new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
-      if (getMaxNumRecords() > 0) {
-        BoundedReadFromUnboundedSource<KinesisRecord> bounded =
-            read.withMaxNumRecords(getMaxNumRecords());
-        return getMaxReadTime() == null
-            ? input.apply(bounded)
-            : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
-      } else {
-        return getMaxReadTime() == null
-            ? input.apply(read)
-            : input.apply(read.withMaxReadTime(getMaxReadTime()));
-      }
-    }
-
-    private static final class BasicKinesisProvider implements KinesisClientProvider {
-
-      private final String accessKey;
-      private final String secretKey;
-      private final Regions region;
-
-      private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
-        this.accessKey = checkNotNull(accessKey, "accessKey");
-        this.secretKey = checkNotNull(secretKey, "secretKey");
-        this.region = checkNotNull(region, "region");
-      }
-
-      private AWSCredentialsProvider getCredentialsProvider() {
-        return new StaticCredentialsProvider(new BasicAWSCredentials(
-            accessKey,
-            secretKey
-        ));
-
-      }
-
-      @Override
-      public AmazonKinesis get() {
-        AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
-        client.withRegion(region);
-        return client;
-      }
+    /** Implementation of {@link #read}. */
+    @AutoValue
+    public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
+        @Nullable
+        abstract String getStreamName();
+
+        @Nullable
+        abstract StartingPoint getInitialPosition();
+
+        @Nullable
+        abstract KinesisClientProvider getClientProvider();
+
+        abstract int getMaxNumRecords();
+
+        @Nullable
+        abstract Duration getMaxReadTime();
+
+        abstract Builder toBuilder();
+
+        @AutoValue.Builder
+        abstract static class Builder {
+            abstract Builder setStreamName(String streamName);
+            abstract Builder setInitialPosition(StartingPoint startingPoint);
+            abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+            abstract Builder setMaxNumRecords(int maxNumRecords);
+            abstract Builder setMaxReadTime(Duration maxReadTime);
+
+            abstract Read build();
+        }
+
+        /**
+         * Specify reading from streamName at some initial position.
+         */
+        public Read from(String streamName, InitialPositionInStream initialPosition) {
+            return toBuilder()
+                .setStreamName(streamName)
+                .setInitialPosition(
+                    new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+                .build();
+        }
+
+        /**
+         * Specify reading from streamName beginning at given {@link Instant}.
+         * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+         */
+        public Read from(String streamName, Instant initialTimestamp) {
+            return toBuilder()
+                .setStreamName(streamName)
+                .setInitialPosition(
+                    new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+                .build();
+        }
+
+        /**
+         * Allows to specify custom {@link KinesisClientProvider}.
+         * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+         * used for communication with Kinesis.
+         * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
+         * does not suit your needs.
+         */
+        public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
+            return toBuilder().setClientProvider(kinesisClientProvider).build();
+        }
+
+        /**
+         * Specify credential details and region to be used to read from Kinesis.
+         * If you need more sophisticated credential protocol, then you should look at
+         * {@link Read#withClientProvider(KinesisClientProvider)}.
+         */
+        public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
+            return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+        }
+
+        /** Specifies to read at most a given number of records. */
+        public Read withMaxNumRecords(int maxNumRecords) {
+            checkArgument(
+                maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
+            return toBuilder().setMaxNumRecords(maxNumRecords).build();
+        }
+
+        /** Specifies to read at most a given number of records. */
+        public Read withMaxReadTime(Duration maxReadTime) {
+            checkNotNull(maxReadTime, "maxReadTime");
+            return toBuilder().setMaxReadTime(maxReadTime).build();
+        }
+
+        @Override
+        public PCollection<KinesisRecord> expand(PBegin input) {
+            org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
+                org.apache.beam.sdk.io.Read.from(
+                    new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
+            if (getMaxNumRecords() > 0) {
+                BoundedReadFromUnboundedSource<KinesisRecord> bounded =
+                    read.withMaxNumRecords(getMaxNumRecords());
+                return getMaxReadTime() == null
+                    ? input.apply(bounded)
+                    : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
+            } else {
+                return getMaxReadTime() == null
+                    ? input.apply(read)
+                    : input.apply(read.withMaxReadTime(getMaxReadTime()));
+            }
+        }
+
+        private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+            private final String accessKey;
+            private final String secretKey;
+            private final Regions region;
+
+            private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+                this.accessKey = checkNotNull(accessKey, "accessKey");
+                this.secretKey = checkNotNull(secretKey, "secretKey");
+                this.region = checkNotNull(region, "region");
+            }
+
+
+            private AWSCredentialsProvider getCredentialsProvider() {
+                return new StaticCredentialsProvider(new BasicAWSCredentials(
+                        accessKey,
+                        secretKey
+                ));
+
+            }
+
+            @Override
+            public AmazonKinesis get() {
+                AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+                client.withRegion(region);
+                return client;
+            }
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index e5c32d2..2138094 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -17,129 +17,129 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
-
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Reads data from multiple kinesis shards in a single thread.
  * It uses simple round robin algorithm when fetching data from shards.
  */
 class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+    private final SimplifiedKinesisClient kinesis;
+    private final UnboundedSource<KinesisRecord, ?> source;
+    private final CheckpointGenerator initialCheckpointGenerator;
+    private RoundRobin<ShardRecordsIterator> shardIterators;
+    private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+    public KinesisReader(SimplifiedKinesisClient kinesis,
+                         CheckpointGenerator initialCheckpointGenerator,
+                         UnboundedSource<KinesisRecord, ?> source) {
+        this.kinesis = checkNotNull(kinesis, "kinesis");
+        this.initialCheckpointGenerator =
+                checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+        this.source = source;
+    }
+
+    /**
+     * Generates initial checkpoint and instantiates iterators for shards.
+     */
+    @Override
+    public boolean start() throws IOException {
+        LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+        try {
+            KinesisReaderCheckpoint initialCheckpoint =
+                    initialCheckpointGenerator.generate(kinesis);
+            List<ShardRecordsIterator> iterators = newArrayList();
+            for (ShardCheckpoint checkpoint : initialCheckpoint) {
+                iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+            }
+            shardIterators = new RoundRobin<>(iterators);
+        } catch (TransientKinesisException e) {
+            throw new IOException(e);
+        }
 
-  private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
-
-  private final SimplifiedKinesisClient kinesis;
-  private final UnboundedSource<KinesisRecord, ?> source;
-  private final CheckpointGenerator initialCheckpointGenerator;
-  private RoundRobin<ShardRecordsIterator> shardIterators;
-  private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
-
-  public KinesisReader(SimplifiedKinesisClient kinesis,
-      CheckpointGenerator initialCheckpointGenerator,
-      UnboundedSource<KinesisRecord, ?> source) {
-    this.kinesis = checkNotNull(kinesis, "kinesis");
-    this.initialCheckpointGenerator =
-        checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
-    this.source = source;
-  }
-
-  /**
-   * Generates initial checkpoint and instantiates iterators for shards.
-   */
-  @Override
-  public boolean start() throws IOException {
-    LOG.info("Starting reader using {}", initialCheckpointGenerator);
-
-    try {
-      KinesisReaderCheckpoint initialCheckpoint =
-          initialCheckpointGenerator.generate(kinesis);
-      List<ShardRecordsIterator> iterators = newArrayList();
-      for (ShardCheckpoint checkpoint : initialCheckpoint) {
-        iterators.add(checkpoint.getShardRecordsIterator(kinesis));
-      }
-      shardIterators = new RoundRobin<>(iterators);
-    } catch (TransientKinesisException e) {
-      throw new IOException(e);
+        return advance();
     }
 
-    return advance();
-  }
-
-  /**
-   * Moves to the next record in one of the shards.
-   * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
-   * If not, we iterate over shards in a round-robin manner.
-   */
-  @Override
-  public boolean advance() throws IOException {
-    try {
-      for (int i = 0; i < shardIterators.size(); ++i) {
-        currentRecord = shardIterators.getCurrent().next();
-        if (currentRecord.isPresent()) {
-          return true;
-        } else {
-          shardIterators.moveForward();
+    /**
+     * Moves to the next record in one of the shards.
+     * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+     * If not, we iterate over shards in a round-robin manner.
+     */
+    @Override
+    public boolean advance() throws IOException {
+        try {
+            for (int i = 0; i < shardIterators.size(); ++i) {
+                currentRecord = shardIterators.getCurrent().next();
+                if (currentRecord.isPresent()) {
+                    return true;
+                } else {
+                    shardIterators.moveForward();
+                }
+            }
+        } catch (TransientKinesisException e) {
+            LOG.warn("Transient exception occurred", e);
         }
-      }
-    } catch (TransientKinesisException e) {
-      LOG.warn("Transient exception occurred", e);
+        return false;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+        return currentRecord.get().getUniqueId();
+    }
+
+    @Override
+    public KinesisRecord getCurrent() throws NoSuchElementException {
+        return currentRecord.get();
+    }
+
+    /**
+     * When {@link KinesisReader} was advanced to the current record.
+     * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+     * is not guaranteed to be accurate - this could lead to mark some records as "late"
+     * even if they were not.
+     */
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return currentRecord.get().getReadTime();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Current time.
+     * We cannot give better approximation of the watermark with current semantics of
+     * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+     * {@link KinesisReader#advance()} will be called.
+     */
+    @Override
+    public Instant getWatermark() {
+        return Instant.now();
+    }
+
+    @Override
+    public UnboundedSource.CheckpointMark getCheckpointMark() {
+        return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+    }
+
+    @Override
+    public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+        return source;
     }
-    return false;
-  }
-
-  @Override
-  public byte[] getCurrentRecordId() throws NoSuchElementException {
-    return currentRecord.get().getUniqueId();
-  }
-
-  @Override
-  public KinesisRecord getCurrent() throws NoSuchElementException {
-    return currentRecord.get();
-  }
-
-  /**
-   * When {@link KinesisReader} was advanced to the current record.
-   * We cannot use approximate arrival timestamp given for each record by Kinesis as it
-   * is not guaranteed to be accurate - this could lead to mark some records as "late"
-   * even if they were not.
-   */
-  @Override
-  public Instant getCurrentTimestamp() throws NoSuchElementException {
-    return currentRecord.get().getReadTime();
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  /**
-   * Current time.
-   * We cannot give better approximation of the watermark with current semantics of
-   * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
-   * {@link KinesisReader#advance()} will be called.
-   */
-  @Override
-  public Instant getWatermark() {
-    return Instant.now();
-  }
-
-  @Override
-  public UnboundedSource.CheckpointMark getCheckpointMark() {
-    return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
-  }
-
-  @Override
-  public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
-    return source;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index d995e75..f0fa45d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -23,13 +23,11 @@ import static com.google.common.collect.Lists.partition;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.io.UnboundedSource;
 
 /**
@@ -39,61 +37,60 @@ import org.apache.beam.sdk.io.UnboundedSource;
  * This class is immutable.
  */
 class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
-    .CheckpointMark, Serializable {
-
-  private final List<ShardCheckpoint> shardCheckpoints;
+        .CheckpointMark, Serializable {
+    private final List<ShardCheckpoint> shardCheckpoints;
 
-  public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
-    this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
-  }
-
-  public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
-      iterators) {
-    return new KinesisReaderCheckpoint(transform(iterators,
-        new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
-          @Nullable
-          @Override
-          public ShardCheckpoint apply(@Nullable
-              ShardRecordsIterator shardRecordsIterator) {
-            assert shardRecordsIterator != null;
-            return shardRecordsIterator.getCheckpoint();
-          }
-        }));
-  }
+    public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+        this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+    }
 
-  /**
-   * Splits given multi-shard checkpoint into partitions of approximately equal size.
-   *
-   * @param desiredNumSplits - upper limit for number of partitions to generate.
-   * @return list of checkpoints covering consecutive partitions of current checkpoint.
-   */
-  public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
-    int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+    public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+                                                                   iterators) {
+        return new KinesisReaderCheckpoint(transform(iterators,
+                new Function<ShardRecordsIterator, ShardCheckpoint>() {
+
+                    @Nullable
+                    @Override
+                    public ShardCheckpoint apply(@Nullable
+                                                 ShardRecordsIterator shardRecordsIterator) {
+                        assert shardRecordsIterator != null;
+                        return shardRecordsIterator.getCheckpoint();
+                    }
+                }));
+    }
 
-    List<KinesisReaderCheckpoint> checkpoints = newArrayList();
-    for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
-      checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
+    /**
+     * Splits given multi-shard checkpoint into partitions of approximately equal size.
+     *
+     * @param desiredNumSplits - upper limit for number of partitions to generate.
+     * @return list of checkpoints covering consecutive partitions of current checkpoint.
+     */
+    public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+        int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+
+        List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+        for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+            checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
+        }
+        return checkpoints;
     }
-    return checkpoints;
-  }
 
-  private int divideAndRoundUp(int nominator, int denominator) {
-    return (nominator + denominator - 1) / denominator;
-  }
+    private int divideAndRoundUp(int nominator, int denominator) {
+        return (nominator + denominator - 1) / denominator;
+    }
 
-  @Override
-  public void finalizeCheckpoint() throws IOException {
+    @Override
+    public void finalizeCheckpoint() throws IOException {
 
-  }
+    }
 
-  @Override
-  public String toString() {
-    return shardCheckpoints.toString();
-  }
+    @Override
+    public String toString() {
+        return shardCheckpoints.toString();
+    }
 
-  @Override
-  public Iterator<ShardCheckpoint> iterator() {
-    return shardCheckpoints.iterator();
-  }
+    @Override
+    public Iterator<ShardCheckpoint> iterator() {
+        return shardCheckpoints.iterator();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index 057b7bb..02b5370 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -22,9 +22,7 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Charsets;
-
 import java.nio.ByteBuffer;
-
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -32,92 +30,91 @@ import org.joda.time.Instant;
  * {@link UserRecord} enhanced with utility methods.
  */
 public class KinesisRecord {
-
-  private Instant readTime;
-  private String streamName;
-  private String shardId;
-  private long subSequenceNumber;
-  private String sequenceNumber;
-  private Instant approximateArrivalTimestamp;
-  private ByteBuffer data;
-  private String partitionKey;
-
-  public KinesisRecord(UserRecord record, String streamName, String shardId) {
-    this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
-        record.getPartitionKey(),
-        new Instant(record.getApproximateArrivalTimestamp()),
-        Instant.now(),
-        streamName, shardId);
-  }
-
-  public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
-      String partitionKey, Instant approximateArrivalTimestamp,
-      Instant readTime,
-      String streamName, String shardId) {
-    this.data = data;
-    this.sequenceNumber = sequenceNumber;
-    this.subSequenceNumber = subSequenceNumber;
-    this.partitionKey = partitionKey;
-    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
-    this.readTime = readTime;
-    this.streamName = streamName;
-    this.shardId = shardId;
-  }
-
-  public ExtendedSequenceNumber getExtendedSequenceNumber() {
-    return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
-  }
-
-  /***
-   * @return unique id of the record based on its position in the stream
-   */
-  public byte[] getUniqueId() {
-    return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
-  }
-
-  public Instant getReadTime() {
-    return readTime;
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getShardId() {
-    return shardId;
-  }
-
-  public byte[] getDataAsBytes() {
-    return getData().array();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return EqualsBuilder.reflectionEquals(this, obj);
-  }
-
-  @Override
-  public int hashCode() {
-    return reflectionHashCode(this);
-  }
-
-  public long getSubSequenceNumber() {
-    return subSequenceNumber;
-  }
-
-  public String getSequenceNumber() {
-    return sequenceNumber;
-  }
-
-  public Instant getApproximateArrivalTimestamp() {
-    return approximateArrivalTimestamp;
-  }
-
-  public ByteBuffer getData() {
-    return data;
-  }
-
-  public String getPartitionKey() {
-    return partitionKey;
-  }
+    private Instant readTime;
+    private String streamName;
+    private String shardId;
+    private long subSequenceNumber;
+    private String sequenceNumber;
+    private Instant approximateArrivalTimestamp;
+    private ByteBuffer data;
+    private String partitionKey;
+
+    public KinesisRecord(UserRecord record, String streamName, String shardId) {
+        this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+                record.getPartitionKey(),
+                new Instant(record.getApproximateArrivalTimestamp()),
+                Instant.now(),
+                streamName, shardId);
+    }
+
+    public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+                         String partitionKey, Instant approximateArrivalTimestamp,
+                         Instant readTime,
+                         String streamName, String shardId) {
+        this.data = data;
+        this.sequenceNumber = sequenceNumber;
+        this.subSequenceNumber = subSequenceNumber;
+        this.partitionKey = partitionKey;
+        this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+        this.readTime = readTime;
+        this.streamName = streamName;
+        this.shardId = shardId;
+    }
+
+    public ExtendedSequenceNumber getExtendedSequenceNumber() {
+        return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+    }
+
+    /***
+     * @return unique id of the record based on its position in the stream
+     */
+    public byte[] getUniqueId() {
+        return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+    }
+
+    public Instant getReadTime() {
+        return readTime;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public String getShardId() {
+        return shardId;
+    }
+
+    public byte[] getDataAsBytes() {
+        return getData().array();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return EqualsBuilder.reflectionEquals(this, obj);
+    }
+
+    @Override
+    public int hashCode() {
+        return reflectionHashCode(this);
+    }
+
+    public long getSubSequenceNumber() {
+        return subSequenceNumber;
+    }
+
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public Instant getApproximateArrivalTimestamp() {
+        return approximateArrivalTimestamp;
+    }
+
+    public ByteBuffer getData() {
+        return data;
+    }
+
+    public String getPartitionKey() {
+        return partitionKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index dcf564d..f233e27 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -34,41 +33,40 @@ import org.joda.time.Instant;
  * A {@link Coder} for {@link KinesisRecord}.
  */
 class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+    private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
 
-  private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
-  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
-  private static final InstantCoder INSTANT_CODER = InstantCoder.of();
-  private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
-
-  public static KinesisRecordCoder of() {
-    return new KinesisRecordCoder();
-  }
+    public static KinesisRecordCoder of() {
+        return new KinesisRecordCoder();
+    }
 
-  @Override
-  public void encode(KinesisRecord value, OutputStream outStream) throws
-      IOException {
-    BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
-    STRING_CODER.encode(value.getSequenceNumber(), outStream);
-    STRING_CODER.encode(value.getPartitionKey(), outStream);
-    INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
-    VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
-    INSTANT_CODER.encode(value.getReadTime(), outStream);
-    STRING_CODER.encode(value.getStreamName(), outStream);
-    STRING_CODER.encode(value.getShardId(), outStream);
-  }
+    @Override
+    public void encode(KinesisRecord value, OutputStream outStream) throws
+            IOException {
+        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+        STRING_CODER.encode(value.getSequenceNumber(), outStream);
+        STRING_CODER.encode(value.getPartitionKey(), outStream);
+        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+        INSTANT_CODER.encode(value.getReadTime(), outStream);
+        STRING_CODER.encode(value.getStreamName(), outStream);
+        STRING_CODER.encode(value.getShardId(), outStream);
+    }
 
-  @Override
-  public KinesisRecord decode(InputStream inStream) throws IOException {
-    ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
-    String sequenceNumber = STRING_CODER.decode(inStream);
-    String partitionKey = STRING_CODER.decode(inStream);
-    Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
-    long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
-    Instant readTimestamp = INSTANT_CODER.decode(inStream);
-    String streamName = STRING_CODER.decode(inStream);
-    String shardId = STRING_CODER.decode(inStream);
-    return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
-        approximateArrivalTimestamp, readTimestamp, streamName, shardId
-    );
-  }
+    @Override
+    public KinesisRecord decode(InputStream inStream) throws IOException {
+        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+        String sequenceNumber = STRING_CODER.decode(inStream);
+        String partitionKey = STRING_CODER.decode(inStream);
+        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+        long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+        Instant readTimestamp = INSTANT_CODER.decode(inStream);
+        String streamName = STRING_CODER.decode(inStream);
+        String shardId = STRING_CODER.decode(inStream);
+        return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+                approximateArrivalTimestamp, readTimestamp, streamName, shardId
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 362792b..7e67d07 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -29,85 +28,85 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Represents source for single stream in Kinesis.
  */
 class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+    private final KinesisClientProvider kinesis;
+    private CheckpointGenerator initialCheckpointGenerator;
 
-  private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
-
-  private final KinesisClientProvider kinesis;
-  private CheckpointGenerator initialCheckpointGenerator;
-
-  public KinesisSource(KinesisClientProvider kinesis, String streamName,
-      StartingPoint startingPoint) {
-    this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
-  }
-
-  private KinesisSource(KinesisClientProvider kinesisClientProvider,
-      CheckpointGenerator initialCheckpoint) {
-    this.kinesis = kinesisClientProvider;
-    this.initialCheckpointGenerator = initialCheckpoint;
-    validate();
-  }
-
-  /**
-   * Generate splits for reading from the stream.
-   * Basically, it'll try to evenly split set of shards in the stream into
-   * {@code desiredNumSplits} partitions. Each partition is then a split.
-   */
-  @Override
-  public List<KinesisSource> split(int desiredNumSplits,
-      PipelineOptions options) throws Exception {
-    KinesisReaderCheckpoint checkpoint =
-        initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
-
-    List<KinesisSource> sources = newArrayList();
-
-    for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
-      sources.add(new KinesisSource(
-          kinesis,
-          new StaticCheckpointGenerator(partition)));
+    public KinesisSource(KinesisClientProvider kinesis, String streamName,
+                         StartingPoint startingPoint) {
+        this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
     }
-    return sources;
-  }
-
-  /**
-   * Creates reader based on given {@link KinesisReaderCheckpoint}.
-   * If {@link KinesisReaderCheckpoint} is not given, then we use
-   * {@code initialCheckpointGenerator} to generate new checkpoint.
-   */
-  @Override
-  public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
-      KinesisReaderCheckpoint checkpointMark) {
-
-    CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
-
-    if (checkpointMark != null) {
-      checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
+
+    private KinesisSource(KinesisClientProvider kinesisClientProvider,
+                          CheckpointGenerator initialCheckpoint) {
+        this.kinesis = kinesisClientProvider;
+        this.initialCheckpointGenerator = initialCheckpoint;
+        validate();
     }
 
-    LOG.info("Creating new reader using {}", checkpointGenerator);
-
-    return new KinesisReader(
-        SimplifiedKinesisClient.from(kinesis),
-        checkpointGenerator,
-        this);
-  }
-
-  @Override
-  public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
-    return SerializableCoder.of(KinesisReaderCheckpoint.class);
-  }
-
-  @Override
-  public void validate() {
-    checkNotNull(kinesis);
-    checkNotNull(initialCheckpointGenerator);
-  }
-
-  @Override
-  public Coder<KinesisRecord> getDefaultOutputCoder() {
-    return KinesisRecordCoder.of();
-  }
+    /**
+     * Generate splits for reading from the stream.
+     * Basically, it'll try to evenly split set of shards in the stream into
+     * {@code desiredNumSplits} partitions. Each partition is then a split.
+     */
+    @Override
+    public List<KinesisSource> split(int desiredNumSplits,
+                                                     PipelineOptions options) throws Exception {
+        KinesisReaderCheckpoint checkpoint =
+                initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+        List<KinesisSource> sources = newArrayList();
+
+        for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+            sources.add(new KinesisSource(
+                    kinesis,
+                    new StaticCheckpointGenerator(partition)));
+        }
+        return sources;
+    }
+
+    /**
+     * Creates reader based on given {@link KinesisReaderCheckpoint}.
+     * If {@link KinesisReaderCheckpoint} is not given, then we use
+     * {@code initialCheckpointGenerator} to generate new checkpoint.
+     */
+    @Override
+    public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+                                                KinesisReaderCheckpoint checkpointMark) {
+
+        CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+        if (checkpointMark != null) {
+            checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
+        }
+
+        LOG.info("Creating new reader using {}", checkpointGenerator);
+
+        return new KinesisReader(
+                SimplifiedKinesisClient.from(kinesis),
+                checkpointGenerator,
+                this);
+    }
+
+    @Override
+    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+        return SerializableCoder.of(KinesisReaderCheckpoint.class);
+    }
+
+    @Override
+    public void validate() {
+        checkNotNull(kinesis);
+        checkNotNull(initialCheckpointGenerator);
+    }
+
+    @Override
+    public Coder<KinesisRecord> getDefaultOutputCoder() {
+        return KinesisRecordCoder.of();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
index eca725c..40e65fc 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
 
+
 /**
  * Filters out records, which were already processed and checkpointed.
  *
@@ -28,14 +29,13 @@ import java.util.List;
  * accuracy, not with "subSequenceNumber" accuracy.
  */
 class RecordFilter {
-
-  public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
-    List<KinesisRecord> filteredRecords = newArrayList();
-    for (KinesisRecord record : records) {
-      if (checkpoint.isBeforeOrAt(record)) {
-        filteredRecords.add(record);
-      }
+    public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+        List<KinesisRecord> filteredRecords = newArrayList();
+        for (KinesisRecord record : records) {
+            if (checkpoint.isBeforeOrAt(record)) {
+                filteredRecords.add(record);
+            }
+        }
+        return filteredRecords;
     }
-    return filteredRecords;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index 806d982..e4ff541 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -27,28 +27,27 @@ import java.util.Iterator;
  * Very simple implementation of round robin algorithm.
  */
 class RoundRobin<T> implements Iterable<T> {
+    private final Deque<T> deque;
 
-  private final Deque<T> deque;
+    public RoundRobin(Iterable<T> collection) {
+        this.deque = newArrayDeque(collection);
+        checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+    }
 
-  public RoundRobin(Iterable<T> collection) {
-    this.deque = newArrayDeque(collection);
-    checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
-  }
+    public T getCurrent() {
+        return deque.getFirst();
+    }
 
-  public T getCurrent() {
-    return deque.getFirst();
-  }
+    public void moveForward() {
+        deque.addLast(deque.removeFirst());
+    }
 
-  public void moveForward() {
-    deque.addLast(deque.removeFirst());
-  }
+    public int size() {
+        return deque.size();
+    }
 
-  public int size() {
-    return deque.size();
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-    return deque.iterator();
-  }
+    @Override
+    public Iterator<T> iterator() {
+        return deque.iterator();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 95f97b8..6aa3504 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
@@ -26,11 +27,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
 import java.io.Serializable;
-
 import org.joda.time.Instant;
 
+
 /**
  * Checkpoint mark for single shard in the stream.
  * Current position in the shard is determined by either:
@@ -45,132 +45,131 @@ import org.joda.time.Instant;
  * This class is immutable.
  */
 class ShardCheckpoint implements Serializable {
+    private final String streamName;
+    private final String shardId;
+    private final String sequenceNumber;
+    private final ShardIteratorType shardIteratorType;
+    private final Long subSequenceNumber;
+    private final Instant timestamp;
+
+    public ShardCheckpoint(String streamName, String shardId, StartingPoint
+            startingPoint) {
+        this(streamName, shardId,
+                ShardIteratorType.fromValue(startingPoint.getPositionName()),
+                startingPoint.getTimestamp());
+    }
+
+    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+            shardIteratorType, Instant timestamp) {
+        this(streamName, shardId, shardIteratorType, null, null, timestamp);
+    }
+
+    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+            shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+        this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+    }
+
+    private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+                            String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+        this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+        this.streamName = checkNotNull(streamName, "streamName");
+        this.shardId = checkNotNull(shardId, "shardId");
+        if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+            checkNotNull(sequenceNumber,
+                    "You must provide sequence number for AT_SEQUENCE_NUMBER"
+                            + " or AFTER_SEQUENCE_NUMBER");
+        } else {
+            checkArgument(sequenceNumber == null,
+                    "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
+        }
+        if (shardIteratorType == AT_TIMESTAMP) {
+            checkNotNull(timestamp,
+                    "You must provide timestamp for AT_SEQUENCE_NUMBER"
+                            + " or AFTER_SEQUENCE_NUMBER");
+        } else {
+            checkArgument(timestamp == null,
+                    "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
+        }
+
+        this.subSequenceNumber = subSequenceNumber;
+        this.sequenceNumber = sequenceNumber;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+     * on the the underlying shardIteratorType, it will either compare the timestamp or the
+     * {@link ExtendedSequenceNumber}.
+     *
+     * @param other
+     * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+     */
+    public boolean isBeforeOrAt(KinesisRecord other) {
+        if (shardIteratorType == AT_TIMESTAMP) {
+            return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
+        }
+        int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+        if (result == 0) {
+            return shardIteratorType == AT_SEQUENCE_NUMBER;
+        }
+        return result < 0;
+    }
+
+    private ExtendedSequenceNumber extendedSequenceNumber() {
+        String fullSequenceNumber = sequenceNumber;
+        if (fullSequenceNumber == null) {
+            fullSequenceNumber = shardIteratorType.toString();
+        }
+        return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+    }
 
-  private final String streamName;
-  private final String shardId;
-  private final String sequenceNumber;
-  private final ShardIteratorType shardIteratorType;
-  private final Long subSequenceNumber;
-  private final Instant timestamp;
-
-  public ShardCheckpoint(String streamName, String shardId, StartingPoint
-      startingPoint) {
-    this(streamName, shardId,
-        ShardIteratorType.fromValue(startingPoint.getPositionName()),
-        startingPoint.getTimestamp());
-  }
-
-  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-      shardIteratorType, Instant timestamp) {
-    this(streamName, shardId, shardIteratorType, null, null, timestamp);
-  }
-
-  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-      shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
-    this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
-  }
-
-  private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
-      String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
-    this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
-    this.streamName = checkNotNull(streamName, "streamName");
-    this.shardId = checkNotNull(shardId, "shardId");
-    if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
-      checkNotNull(sequenceNumber,
-          "You must provide sequence number for AT_SEQUENCE_NUMBER"
-              + " or AFTER_SEQUENCE_NUMBER");
-    } else {
-      checkArgument(sequenceNumber == null,
-          "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
+    @Override
+    public String toString() {
+        return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+                streamName, shardId,
+                sequenceNumber);
     }
-    if (shardIteratorType == AT_TIMESTAMP) {
-      checkNotNull(timestamp,
-          "You must provide timestamp for AT_SEQUENCE_NUMBER"
-              + " or AFTER_SEQUENCE_NUMBER");
-    } else {
-      checkArgument(timestamp == null,
-          "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
+
+    public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+            throws TransientKinesisException {
+        return new ShardRecordsIterator(this, kinesis);
     }
 
-    this.subSequenceNumber = subSequenceNumber;
-    this.sequenceNumber = sequenceNumber;
-    this.timestamp = timestamp;
-  }
-
-  /**
-   * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
-   * on the the underlying shardIteratorType, it will either compare the timestamp or the
-   * {@link ExtendedSequenceNumber}.
-   *
-   * @param other
-   * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
-   */
-  public boolean isBeforeOrAt(KinesisRecord other) {
-    if (shardIteratorType == AT_TIMESTAMP) {
-      return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
+    public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+            throws TransientKinesisException {
+        if (checkpointIsInTheMiddleOfAUserRecord()) {
+            return kinesisClient.getShardIterator(streamName,
+                    shardId, AT_SEQUENCE_NUMBER,
+                    sequenceNumber, null);
+        }
+        return kinesisClient.getShardIterator(streamName,
+                shardId, shardIteratorType,
+                sequenceNumber, timestamp);
     }
-    int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
-    if (result == 0) {
-      return shardIteratorType == AT_SEQUENCE_NUMBER;
+
+    private boolean checkpointIsInTheMiddleOfAUserRecord() {
+        return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
     }
-    return result < 0;
-  }
 
-  private ExtendedSequenceNumber extendedSequenceNumber() {
-    String fullSequenceNumber = sequenceNumber;
-    if (fullSequenceNumber == null) {
-      fullSequenceNumber = shardIteratorType.toString();
+    /**
+     * Used to advance checkpoint mark to position after given {@link Record}.
+     *
+     * @param record
+     * @return new checkpoint object pointing directly after given {@link Record}
+     */
+    public ShardCheckpoint moveAfter(KinesisRecord record) {
+        return new ShardCheckpoint(
+                streamName, shardId,
+                AFTER_SEQUENCE_NUMBER,
+                record.getSequenceNumber(),
+                record.getSubSequenceNumber());
     }
-    return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
-        streamName, shardId,
-        sequenceNumber);
-  }
-
-  public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
-      throws TransientKinesisException {
-    return new ShardRecordsIterator(this, kinesis);
-  }
-
-  public String getShardIterator(SimplifiedKinesisClient kinesisClient)
-      throws TransientKinesisException {
-    if (checkpointIsInTheMiddleOfAUserRecord()) {
-      return kinesisClient.getShardIterator(streamName,
-          shardId, AT_SEQUENCE_NUMBER,
-          sequenceNumber, null);
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public String getShardId() {
+        return shardId;
     }
-    return kinesisClient.getShardIterator(streamName,
-        shardId, shardIteratorType,
-        sequenceNumber, timestamp);
-  }
-
-  private boolean checkpointIsInTheMiddleOfAUserRecord() {
-    return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
-  }
-
-  /**
-   * Used to advance checkpoint mark to position after given {@link Record}.
-   *
-   * @param record
-   * @return new checkpoint object pointing directly after given {@link Record}
-   */
-  public ShardCheckpoint moveAfter(KinesisRecord record) {
-    return new ShardCheckpoint(
-        streamName, shardId,
-        AFTER_SEQUENCE_NUMBER,
-        record.getSequenceNumber(),
-        record.getSubSequenceNumber());
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getShardId() {
-    return shardId;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index a69c6c1..872f604 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,9 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Queues.newArrayDeque;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-
 import java.util.Deque;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,68 +31,68 @@ import org.slf4j.LoggerFactory;
  * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
  */
 class ShardRecordsIterator {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
+    private final SimplifiedKinesisClient kinesis;
+    private final RecordFilter filter;
+    private ShardCheckpoint checkpoint;
+    private String shardIterator;
+    private Deque<KinesisRecord> data = newArrayDeque();
 
-  private final SimplifiedKinesisClient kinesis;
-  private final RecordFilter filter;
-  private ShardCheckpoint checkpoint;
-  private String shardIterator;
-  private Deque<KinesisRecord> data = newArrayDeque();
+    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+                                SimplifiedKinesisClient simplifiedKinesisClient) throws
+            TransientKinesisException {
+        this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+    }
 
-  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-      SimplifiedKinesisClient simplifiedKinesisClient) throws
-      TransientKinesisException {
-    this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
-  }
+    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+                                SimplifiedKinesisClient simplifiedKinesisClient,
+                                RecordFilter filter) throws
+            TransientKinesisException {
 
-  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-      SimplifiedKinesisClient simplifiedKinesisClient,
-      RecordFilter filter) throws
-      TransientKinesisException {
+        this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+        this.filter = checkNotNull(filter, "filter");
+        this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+        shardIterator = checkpoint.getShardIterator(kinesis);
+    }
 
-    this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
-    this.filter = checkNotNull(filter, "filter");
-    this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
-    shardIterator = checkpoint.getShardIterator(kinesis);
-  }
+    /**
+     * Returns record if there's any present.
+     * Returns absent() if there are no new records at this time in the shard.
+     */
+    public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+        readMoreIfNecessary();
 
-  /**
-   * Returns record if there's any present.
-   * Returns absent() if there are no new records at this time in the shard.
-   */
-  public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
-    readMoreIfNecessary();
+        if (data.isEmpty()) {
+            return CustomOptional.absent();
+        } else {
+            KinesisRecord record = data.removeFirst();
+            checkpoint = checkpoint.moveAfter(record);
+            return CustomOptional.of(record);
+        }
+    }
 
-    if (data.isEmpty()) {
-      return CustomOptional.absent();
-    } else {
-      KinesisRecord record = data.removeFirst();
-      checkpoint = checkpoint.moveAfter(record);
-      return CustomOptional.of(record);
+    private void readMoreIfNecessary() throws TransientKinesisException {
+        if (data.isEmpty()) {
+            GetKinesisRecordsResult response;
+            try {
+                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+                        checkpoint.getShardId());
+            } catch (ExpiredIteratorException e) {
+                LOG.info("Refreshing expired iterator", e);
+                shardIterator = checkpoint.getShardIterator(kinesis);
+                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+                        checkpoint.getShardId());
+            }
+            LOG.debug("Fetched {} new records", response.getRecords().size());
+            shardIterator = response.getNextShardIterator();
+            data.addAll(filter.apply(response.getRecords(), checkpoint));
+        }
     }
-  }
 
-  private void readMoreIfNecessary() throws TransientKinesisException {
-    if (data.isEmpty()) {
-      GetKinesisRecordsResult response;
-      try {
-        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-            checkpoint.getShardId());
-      } catch (ExpiredIteratorException e) {
-        LOG.info("Refreshing expired iterator", e);
-        shardIterator = checkpoint.getShardIterator(kinesis);
-        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-            checkpoint.getShardId());
-      }
-      LOG.debug("Fetched {} new records", response.getRecords().size());
-      shardIterator = response.getNextShardIterator();
-      data.addAll(filter.apply(response.getRecords(), checkpoint));
+    public ShardCheckpoint getCheckpoint() {
+        return checkpoint;
     }
-  }
 
-  public ShardCheckpoint getCheckpoint() {
-    return checkpoint;
-  }
 
 }


Mime
View raw message