beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [beam] branch master updated: [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations (#15005)
Date Wed, 14 Jul 2021 04:12:22 GMT
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 947cbb7  [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations (#15005)
947cbb7 is described below

commit 947cbb79fe4dfe4e2bc384da488043b509ffbe71
Author: BenWhitehead <BenWhitehead@users.noreply.github.com>
AuthorDate: Wed Jul 14 00:10:10 2021 -0400

    [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations (#15005)
    
    * [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations
    
    Entry point for accessing Firestore V1 read methods is `FirestoreIO.v1().read()`.
    
    Currently supported read RPC methods:
    * `PartitionQuery`
    * `RunQuery`
    * `ListCollectionIds`
    * `ListDocuments`
    * `BatchGetDocuments`
    
    ### Unit Tests
    
    No external dependencies are needed for this suite
    
    A large suite of unit tests have been added to cover most branches and error
    scenarios in the various components. Test for input validation and bounds
    checking are also included in this suite.
    
    ### Integration Tests
    
    Integration tests for each type of RPC is present in
    `org.apache.beam.sdk.io.gcp.firestore.it.FirestoreV1IT`. All of these tests
    leverage `TestPipeline` and verify the expected Documents/Collections are all
    operated on during the test.
    
    * fix failing nullability check for cursor comparator
    
    * fix @Nullable imports
    
    * fix typo
    
    * throw exception upon failing to determine restart point for batch get
    
    * add unit test for org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.PartitionQuery.PartitionQueryResponseToRunQueryRequest.processElement
    
    * javadoc typo fixes from review
    
    * Explicitly set Client built in retry to max 1 attempt since we're taking care of all retry logic at a higher level
    
    * Clean up names of DoFn base classes to make them more accurate
    
    * rename FirestoreV1Fn -> FirestoreV1RpcAttemptContexts
    
    * restructure javadocs a big to keep context close to code samples
    
    * decouple partition query from run query
    
    it can be advantageous to allow a customer to perform some post processing of a query before executing it. By decoupling PartitionQuery from directly outputting to RunQuery this is easily possible.
    
    * Add todo to jira issues for query integration improvements
    
    * spotless
    
    * fix incorrect nullable annotation
---
 .../beam/sdk/io/gcp/firestore/FirestoreDoFn.java   |   30 +-
 .../FirestoreStatefulComponentFactory.java         |   12 +-
 .../beam/sdk/io/gcp/firestore/FirestoreV1.java     | 1103 +++++++++++++++++++-
 .../sdk/io/gcp/firestore/FirestoreV1ReadFn.java    |  632 +++++++++++
 ...1Fn.java => FirestoreV1RpcAttemptContexts.java} |    8 +-
 .../sdk/io/gcp/firestore/FirestoreV1WriteFn.java   |   13 +-
 .../apache/beam/sdk/io/gcp/firestore/RpcQos.java   |    8 +-
 .../beam/sdk/io/gcp/firestore/RpcQosImpl.java      |  253 ++++-
 .../beam/sdk/io/gcp/firestore/RpcQosOptions.java   |    2 +-
 .../io/gcp/firestore/BaseFirestoreV1FnTest.java    |   29 +-
 .../gcp/firestore/BaseFirestoreV1ReadFnTest.java   |  149 +++
 .../gcp/firestore/BaseFirestoreV1WriteFnTest.java  |   26 +-
 .../FirestoreV1FnBatchGetDocumentsTest.java        |  246 +++++
 .../FirestoreV1FnListCollectionIdsTest.java        |  216 ++++
 .../firestore/FirestoreV1FnListDocumentsTest.java  |  263 +++++
 .../firestore/FirestoreV1FnPartitionQueryTest.java |  228 ++++
 .../gcp/firestore/FirestoreV1FnRunQueryTest.java   |  364 +++++++
 ...artitionQueryResponseToRunQueryRequestTest.java |  141 +++
 .../beam/sdk/io/gcp/firestore/RpcQosTest.java      |   71 +-
 .../sdk/io/gcp/firestore/it/BaseFirestoreIT.java   |  184 ++++
 .../sdk/io/gcp/firestore/it/FirestoreV1IT.java     |  190 ++++
 21 files changed, 4070 insertions(+), 98 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
index a777129..d7e73ae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
@@ -46,7 +46,35 @@ abstract class FirestoreDoFn<InT, OutT> extends DoFn<InT, OutT> {
   @StartBundle
   public abstract void startBundle(DoFn<InT, OutT>.StartBundleContext context) throws Exception;
 
-  abstract static class WindowAwareDoFn<InT, OutT> extends FirestoreDoFn<InT, OutT> {
+  /**
+   * This class defines a common parent class for those DoFn which rely on the implicit window for
+   * emitting values while processing a bundle.
+   */
+  abstract static class ImplicitlyWindowedFirestoreDoFn<InT, OutT>
+      extends FirestoreDoFn<InT, OutT> {
+    /**
+     * {@link ProcessContext#element() context.element()} must be non-null, otherwise a
+     * NullPointerException will be thrown.
+     *
+     * @param context Context to source element from, and output to
+     * @see org.apache.beam.sdk.transforms.DoFn.ProcessElement
+     */
+    @ProcessElement
+    public abstract void processElement(DoFn<InT, OutT>.ProcessContext context) throws Exception;
+
+    /** @see org.apache.beam.sdk.transforms.DoFn.FinishBundle */
+    @FinishBundle
+    public abstract void finishBundle() throws Exception;
+  }
+
+  /**
+   * This class defines a common parent class for those DoFn which must explicitly track the window
+   * for emitting values while processing bundles. This is primarily necessary to support the
+   * ability to emit values during {@link #finishBundle(DoFn.FinishBundleContext)} where an output
+   * value must be explicitly correlated to a window.
+   */
+  abstract static class ExplicitlyWindowedFirestoreDoFn<InT, OutT>
+      extends FirestoreDoFn<InT, OutT> {
     /**
      * {@link ProcessContext#element() context.element()} must be non-null, otherwise a
      * NullPointerException will be thrown.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
index 5841e60..d8b8ba7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
@@ -19,13 +19,13 @@ package org.apache.beam.sdk.io.gcp.firestore;
 
 import com.google.api.gax.core.FixedCredentialsProvider;
 import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ClientContext;
 import com.google.api.gax.rpc.FixedHeaderProvider;
 import com.google.cloud.firestore.FirestoreOptions.EmulatorCredentials;
 import com.google.cloud.firestore.v1.FirestoreSettings;
 import com.google.cloud.firestore.v1.stub.FirestoreStub;
 import com.google.cloud.firestore.v1.stub.GrpcFirestoreStub;
-import java.io.IOException;
 import java.io.Serializable;
 import java.security.SecureRandom;
 import java.util.Map;
@@ -76,6 +76,14 @@ class FirestoreStatefulComponentFactory implements Serializable {
                     }
                   });
 
+      RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(1).build();
+
+      builder.applyToAllUnaryMethods(
+          b -> {
+            b.setRetrySettings(retrySettings);
+            return null;
+          });
+
       FirestoreOptions firestoreOptions = options.as(FirestoreOptions.class);
       String emulatorHostPort = firestoreOptions.getEmulatorHost();
       if (emulatorHostPort != null) {
@@ -96,7 +104,7 @@ class FirestoreStatefulComponentFactory implements Serializable {
 
       ClientContext clientContext = ClientContext.create(builder.build());
       return GrpcFirestoreStub.create(clientContext);
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
index 7c4756e..dd5202e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
@@ -19,19 +19,49 @@ package org.apache.beam.sdk.io.gcp.firestore;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
 import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Projection;
+import com.google.firestore.v1.Value;
 import com.google.firestore.v1.WriteResult;
 import com.google.rpc.Status;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
 import javax.annotation.concurrent.Immutable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.BatchGetDocumentsFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.ListCollectionIdsFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.ListDocumentsFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryPair;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.RunQueryFn;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithSummary;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -59,6 +89,91 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  *
  * <h3>Operations</h3>
  *
+ * <h4>Read</h4>
+ *
+ * <p>The currently supported read operations and their execution behavior are as follows:
+ *
+ * <table>
+ *   <tbody>
+ *     <tr>
+ *       <th>RPC</th>
+ *       <th>Execution Behavior</th>
+ *       <th>Example Usage</th>
+ *     </tr>
+ *     <tr>
+ *       <td>{@link PartitionQuery}</td>
+ *       <td>Parallel Streaming</td>
+ *       <td>
+ * <pre>
+ * PCollection<{@link PartitionQueryRequest}> partitionQueryRequests = ...;
+ * PCollection<{@link RunQueryRequest}> runQueryRequests = partitionQueryRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#partitionQuery() partitionQuery()}.build());
+ * PCollection<{@link RunQueryResponse}> runQueryResponses = runQueryRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#runQuery() runQuery()}.build());
+ * </pre>
+ *       </td>
+ *     </tr>
+ *     <tr>
+ *       <td>{@link RunQuery}</td>
+ *       <td>Sequential Streaming</td>
+ *       <td>
+ * <pre>
+ * PCollection<{@link RunQueryRequest}> runQueryRequests = ...;
+ * PCollection<{@link RunQueryResponse}> runQueryResponses = runQueryRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#runQuery() runQuery()}.build());
+ * </pre>
+ *       </td>
+ *     </tr>
+ *     <tr>
+ *       <td>{@link BatchGetDocuments}</td>
+ *       <td>Sequential Streaming</td>
+ *       <td>
+ * <pre>
+ * PCollection<{@link BatchGetDocumentsRequest}> batchGetDocumentsRequests = ...;
+ * PCollection<{@link BatchGetDocumentsResponse}> batchGetDocumentsResponses = batchGetDocumentsRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#batchGetDocuments() batchGetDocuments()}.build());
+ * </pre>
+ *       </td>
+ *     </tr>
+ *     <tr>
+ *       <td>{@link ListCollectionIds}</td>
+ *       <td>Sequential Paginated</td>
+ *       <td>
+ * <pre>
+ * PCollection<{@link ListCollectionIdsRequest}> listCollectionIdsRequests = ...;
+ * PCollection<{@link ListCollectionIdsResponse}> listCollectionIdsResponses = listCollectionIdsRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#listCollectionIds() listCollectionIds()}.build());
+ * </pre>
+ *       </td>
+ *     </tr>
+ *     <tr>
+ *       <td>{@link ListDocuments}</td>
+ *       <td>Sequential Paginated</td>
+ *       <td>
+ * <pre>
+ * PCollection<{@link ListDocumentsRequest}> listDocumentsRequests = ...;
+ * PCollection<{@link ListDocumentsResponse}> listDocumentsResponses = listDocumentsRequests
+ *     .apply(FirestoreIO.v1().read().{@link Read#listDocuments() listDocuments()}.build());
+ * </pre>
+ *       </td>
+ *     </tr>
+ *   </tbody>
+ * </table>
+ *
+ * <p>PartitionQuery should be preferred over other options if at all possible, becuase it has the
+ * ability to parallelize execution of multiple queries for specific sub-ranges of the full results.
+ * When choosing the value to set for {@link PartitionQueryRequest.Builder#setPartitionCount(long)},
+ * ensure you are picking a value this makes sense for your data set and your max number of workers.
+ * <i>If you find that a partition query is taking a unexpectedly long time, try increasing the
+ * number of partitions.</i> Depending on how large your dataset is increasing as much as 10x can
+ * significantly reduce total partition query wall time.
+ *
+ * <p>You should only ever use ListDocuments if the use of <a target="_blank" rel="noopener
+ * noreferrer"
+ * href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">{@code
+ * show_missing}</a> is needed to access a document. RunQuery and PartitionQuery will always be
+ * faster if the use of {@code show_missing} is not needed.
+ *
  * <h4>Write</h4>
  *
  * To write a {@link PCollection} to Cloud Firestore use {@link FirestoreV1#write()}, picking the
@@ -68,21 +183,21 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  *
  * <p>The default behavior is to fail a bundle if any single write fails with a non-retryable error.
  *
- * <pre>{@code
- * PCollection<Write> writes = ...;
- * PCollection<WriteSummary> sink = writes
- *     .apply(FirestoreIO.v1().write().batchWrite().build());
- * }</pre>
+ * <pre>
+ * PCollection<{@link com.google.firestore.v1.Write}> writes = ...;
+ * PCollection<{@link WriteSuccessSummary}> sink = writes
+ *     .apply(FirestoreIO.v1().write().{@link Write#batchWrite() batchWrite()}.build());
+ * </pre>
  *
  * Alternatively, if you'd rather output write failures to a Dead Letter Queue add {@link
  * BatchWriteWithSummary.Builder#withDeadLetterQueue() withDeadLetterQueue} when building your
  * writer.
  *
- * <pre>{@code
- * PCollection<Write> writes = ...;
- * PCollection<WriteFailure> writeFailures = writes
- *     .apply(FirestoreIO.v1().write().batchWrite().withDeadLetterQueue().build());
- * }</pre>
+ * <pre>
+ * PCollection<{@link com.google.firestore.v1.Write}> writes = ...;
+ * PCollection<{@link WriteFailure}> writeFailures = writes
+ *     .apply(FirestoreIO.v1().write().{@link Write#batchWrite() batchWrite()}.withDeadLetterQueue().build());
+ * </pre>
  *
  * <h3>Permissions</h3>
  *
@@ -115,6 +230,23 @@ public final class FirestoreV1 {
 
   /**
    * The class returned by this method provides the ability to create {@link PTransform PTransforms}
+   * for read operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.v1.stub.FirestoreStub FirestoreStub}.
+   *
+   * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+   * FirestoreIO#v1()}.
+   *
+   * <p>
+   *
+   * @return Type safe builder factory for read operations.
+   * @see FirestoreIO#v1()
+   */
+  public Read read() {
+    return Read.INSTANCE;
+  }
+
+  /**
+   * The class returned by this method provides the ability to create {@link PTransform PTransforms}
    * for write operations available in the Firestore V1 API provided by {@link
    * com.google.cloud.firestore.v1.stub.FirestoreStub FirestoreStub}.
    *
@@ -131,6 +263,229 @@ public final class FirestoreV1 {
   }
 
   /**
+   * Type safe builder factory for read operations.
+   *
+   * <p>This class is part of the Firestore Connector DSL and should be accessed via {@link #read()
+   * FirestoreIO.v1().read()}.
+   *
+   * <p>This class provides access to a set of type safe builders for read operations available in
+   * the Firestore V1 API accessed through {@link com.google.cloud.firestore.v1.stub.FirestoreStub
+   * FirestoreStub}. Each builder allows configuration before creating an immutable instance which
+   * can be used in your pipeline.
+   *
+   * <p>
+   *
+   * @see FirestoreIO#v1()
+   * @see #read()
+   */
+  @Experimental(Kind.SOURCE_SINK)
+  @Immutable
+  public static final class Read {
+    private static final Read INSTANCE = new Read();
+
+    private Read() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link ListDocumentsRequest} operations.
+     *
+     * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     *
+     * <p>All request quality-of-service for the built {@link ListDocuments} PTransform is scoped to
+     * the worker and configured based on the {@link RpcQosOptions} specified via this builder.
+     *
+     * <p>All logging for the built instance of {@link ListDocuments} will be sent to appender
+     * {@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListDocuments}.
+     *
+     * <p>The following metrics will be available for the built instance of {@link ListDocuments}
+     *
+     * <ol>
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListDocuments.throttlingMs} A
+     *       counter tracking the number of milliseconds RPCs are throttled by Qos
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListDocuments.rpcFailures} A
+     *       counter tracking the number of failed RPCs
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListDocuments.rpcSuccesses} A
+     *       counter tracking the number of successful RPCs
+     * </ol>
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     *     ListDocumentsRequest}s
+     * @see FirestoreIO#v1()
+     * @see ListDocuments
+     * @see ListDocumentsRequest
+     * @see ListDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+     */
+    public ListDocuments.Builder listDocuments() {
+      return new ListDocuments.Builder();
+    }
+
+    /**
+     * Factory method to create a new type safe builder for {@link ListCollectionIdsRequest}
+     * operations.
+     *
+     * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     *
+     * <p>All request quality-of-service for the built {@link ListCollectionIds} PTransform is
+     * scoped to the worker and configured based on the {@link RpcQosOptions} specified via this
+     * builder.
+     *
+     * <p>All logging for the built instance of {@link ListCollectionIds} will be sent to appender
+     * {@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListCollectionIds}.
+     *
+     * <p>The following metrics will be available for the built instance of {@link
+     * ListCollectionIds}
+     *
+     * <ol>
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListCollectionIds.throttlingMs}
+     *       A counter tracking the number of milliseconds RPCs are throttled by Qos
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListCollectionIds.rpcFailures}
+     *       A counter tracking the number of failed RPCs
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListCollectionIds.rpcSuccesses}
+     *       A counter tracking the number of successful RPCs
+     * </ol>
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     *     ListCollectionIdsRequest}s
+     * @see FirestoreIO#v1()
+     * @see ListCollectionIds
+     * @see ListCollectionIdsRequest
+     * @see ListCollectionIdsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+     */
+    public ListCollectionIds.Builder listCollectionIds() {
+      return new ListCollectionIds.Builder();
+    }
+
+    /**
+     * Factory method to create a new type safe builder for {@link BatchGetDocumentsRequest}
+     * operations.
+     *
+     * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     *
+     * <p>All request quality-of-service for the built {@link BatchGetDocuments} PTransform is
+     * scoped to the worker and configured based on the {@link RpcQosOptions} specified via this
+     * builder.
+     *
+     * <p>All logging for the built instance of {@link BatchGetDocuments} will be sent to appender
+     * {@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchGetDocuments}.
+     *
+     * <p>The following metrics will be available for the built instance of {@link
+     * BatchGetDocuments}
+     *
+     * <ol>
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchGetDocuments.throttlingMs}
+     *       A counter tracking the number of milliseconds RPCs are throttled by Qos
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchGetDocuments.rpcFailures}
+     *       A counter tracking the number of failed RPCs
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchGetDocuments.rpcSuccesses}
+     *       A counter tracking the number of successful RPCs
+     *   <li>{@code
+     *       org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery.rpcStreamValueReceived} A
+     *       counter tracking the number of values received by a streaming RPC
+     * </ol>
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     *     BatchGetDocumentsRequest}s
+     * @see FirestoreIO#v1()
+     * @see BatchGetDocuments
+     * @see BatchGetDocumentsRequest
+     * @see BatchGetDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+     */
+    public BatchGetDocuments.Builder batchGetDocuments() {
+      return new BatchGetDocuments.Builder();
+    }
+
+    /**
+     * Factory method to create a new type safe builder for {@link RunQueryRequest} operations.
+     *
+     * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     *
+     * <p>All request quality-of-service for the built {@link RunQuery} PTransform is scoped to the
+     * worker and configured based on the {@link RpcQosOptions} specified via this builder.
+     *
+     * <p>All logging for the built instance of {@link RunQuery} will be sent to appender {@code
+     * org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery}.
+     *
+     * <p>The following metrics will be available for the built instance of {@link RunQuery}
+     *
+     * <ol>
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery.throttlingMs} A
+     *       counter tracking the number of milliseconds RPCs are throttled by Qos
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery.rpcFailures} A counter
+     *       tracking the number of failed RPCs
+     *   <li>{@code org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery.rpcSuccesses} A
+     *       counter tracking the number of successful RPCs
+     *   <li>{@code
+     *       org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery.rpcStreamValueReceived} A
+     *       counter tracking the number of values received by a streaming RPC
+     * </ol>
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     *     RunQueryRequest}s
+     * @see FirestoreIO#v1()
+     * @see RunQuery
+     * @see RunQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+     */
+    public RunQuery.Builder runQuery() {
+      return new RunQuery.Builder();
+    }
+
+    /**
+     * Factory method to create a new type safe builder for {@link PartitionQueryRequest}
+     * operations.
+     *
+     * <p>This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     *
+     * <p>All request quality-of-service for the built {@link PartitionQuery} PTransform is scoped
+     * to the worker and configured based on the {@link RpcQosOptions} specified via this builder.
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     *     PartitionQueryRequest}s
+     * @see FirestoreIO#v1()
+     * @see PartitionQuery
+     * @see PartitionQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+     */
+    public PartitionQuery.Builder partitionQuery() {
+      return new PartitionQuery.Builder();
+    }
+  }
+
+  /**
    * Type safe builder factory for write operations.
    *
    * <p>This class is part of the Firestore Connector DSL and should be accessed via {@link #write()
@@ -191,6 +546,731 @@ public final class FirestoreV1 {
 
   /**
    * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * ListCollectionIdsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * ListCollectionIdsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+   * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is scoped to the worker
+   * and configured via {@link ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listCollectionIds()
+   * @see FirestoreV1.ListCollectionIds.Builder
+   * @see ListCollectionIdsRequest
+   * @see ListCollectionIdsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+   */
+  public static final class ListCollectionIds
+      extends Transform<
+          PCollection<ListCollectionIdsRequest>,
+          PCollection<String>,
+          ListCollectionIds,
+          ListCollectionIds.Builder> {
+
+    private ListCollectionIds(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<ListCollectionIdsRequest> input) {
+      return input
+          .apply(
+              "listCollectionIds",
+              ParDo.of(
+                  new ListCollectionIdsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new FlattenListCollectionIdsResponse()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListCollectionIds} allowing configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+     * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listCollectionIds()
+     * @see FirestoreV1.ListCollectionIds
+     * @see ListCollectionIdsRequest
+     * @see ListCollectionIdsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListCollectionIdsRequest>,
+            PCollection<String>,
+            ListCollectionIds,
+            ListCollectionIds.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListCollectionIds build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListCollectionIds buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListCollectionIds(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * ListDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link ListDocumentsResponse}{@code
+   * >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+   * FirestoreV1.Read#listDocuments() listDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is scoped to the worker
+   * and configured via {@link ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listDocuments()
+   * @see FirestoreV1.ListDocuments.Builder
+   * @see ListDocumentsRequest
+   * @see ListDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+   */
+  public static final class ListDocuments
+      extends Transform<
+          PCollection<ListDocumentsRequest>,
+          PCollection<Document>,
+          ListDocuments,
+          ListDocuments.Builder> {
+
+    private ListDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<Document> expand(PCollection<ListDocumentsRequest> input) {
+      return input
+          .apply(
+              "listDocuments",
+              ParDo.of(
+                  new ListDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new ListDocumentsResponseToDocument()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListDocuments} allowing configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+     * FirestoreV1.Read#listDocuments() listDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listDocuments()
+     * @see FirestoreV1.ListDocuments
+     * @see ListDocumentsRequest
+     * @see ListDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListDocumentsRequest>,
+            PCollection<Document>,
+            ListDocuments,
+            ListDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListDocuments(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * RunQueryRequest}{@code >, }{@link PTransform}{@code <}{@link RunQueryResponse}{@code >>} which
+   * will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+   * FirestoreV1.Read#runQuery() runQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is scoped to the worker
+   * and configured via {@link RunQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#runQuery()
+   * @see FirestoreV1.RunQuery.Builder
+   * @see RunQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+   */
+  // TODO(BEAM-12605): Add dynamic work rebalancing to support a Splittable DoFn
+  // TODO(BEAM-12606): Add support for progress reporting
+  public static final class RunQuery
+      extends Transform<
+          PCollection<RunQueryRequest>, PCollection<RunQueryResponse>, RunQuery, RunQuery.Builder> {
+
+    private RunQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<RunQueryResponse> expand(PCollection<RunQueryRequest> input) {
+      return input
+          .apply(
+              "runQuery",
+              ParDo.of(new RunQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link RunQuery} allowing configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+     * FirestoreV1.Read#runQuery() runQuery()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#runQuery()
+     * @see FirestoreV1.RunQuery
+     * @see RunQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<RunQueryRequest>,
+            PCollection<RunQueryResponse>,
+            RunQuery,
+            RunQuery.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public RunQuery build() {
+        return genericBuild();
+      }
+
+      @Override
+      RunQuery buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new RunQuery(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * BatchGetDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * BatchGetDocumentsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+   * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is scoped to the worker
+   * and configured via {@link BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#batchGetDocuments()
+   * @see FirestoreV1.BatchGetDocuments.Builder
+   * @see BatchGetDocumentsRequest
+   * @see BatchGetDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+   */
+  public static final class BatchGetDocuments
+      extends Transform<
+          PCollection<BatchGetDocumentsRequest>,
+          PCollection<BatchGetDocumentsResponse>,
+          BatchGetDocuments,
+          BatchGetDocuments.Builder> {
+
+    private BatchGetDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<BatchGetDocumentsResponse> expand(
+        PCollection<BatchGetDocumentsRequest> input) {
+      return input
+          .apply(
+              "batchGetDocuments",
+              ParDo.of(
+                  new BatchGetDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchGetDocuments} allowing configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+     * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#batchGetDocuments()
+     * @see FirestoreV1.BatchGetDocuments
+     * @see BatchGetDocumentsRequest
+     * @see BatchGetDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<BatchGetDocumentsRequest>,
+            PCollection<BatchGetDocumentsResponse>,
+            BatchGetDocuments,
+            BatchGetDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      public Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchGetDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      BatchGetDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new BatchGetDocuments(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * PartitionQueryRequest}{@code >, }{@link PTransform}{@code <}{@link RunQueryRequest}{@code >>}
+   * which will read from Firestore.
+   *
+   * <p>Perform the necessary operations and handling of {@link PartitionQueryResponse}s to yield a
+   * number of {@link RunQueryRequest} which are friendly to being executed in parallel.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+   * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is scoped to the worker
+   * and configured via {@link PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#partitionQuery()
+   * @see FirestoreV1.PartitionQuery.Builder
+   * @see FirestoreV1.Read#runQuery()
+   * @see FirestoreV1.RunQuery
+   * @see FirestoreV1.RunQuery.Builder
+   * @see PartitionQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+   */
+  public static final class PartitionQuery
+      extends Transform<
+          PCollection<PartitionQueryRequest>,
+          PCollection<RunQueryRequest>,
+          PartitionQuery,
+          PartitionQuery.Builder> {
+
+    private final boolean nameOnlyQuery;
+
+    private PartitionQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions,
+        boolean nameOnlyQuery) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      this.nameOnlyQuery = nameOnlyQuery;
+    }
+
+    @Override
+    public PCollection<RunQueryRequest> expand(PCollection<PartitionQueryRequest> input) {
+      PCollection<RunQueryRequest> queries =
+          input
+              .apply(
+                  "PartitionQuery",
+                  ParDo.of(
+                      new PartitionQueryFn(
+                          clock, firestoreStatefulComponentFactory, rpcQosOptions)))
+              .apply("expand queries", ParDo.of(new PartitionQueryResponseToRunQueryRequest()));
+      if (nameOnlyQuery) {
+        queries =
+            queries.apply(
+                "set name only query",
+                MapElements.via(
+                    new SimpleFunction<RunQueryRequest, RunQueryRequest>() {
+                      @Override
+                      public RunQueryRequest apply(RunQueryRequest input) {
+                        RunQueryRequest.Builder builder = input.toBuilder();
+                        builder
+                            .getStructuredQueryBuilder()
+                            .setSelect(
+                                Projection.newBuilder()
+                                    .addFields(
+                                        FieldReference.newBuilder()
+                                            .setFieldPath("__name__")
+                                            .build())
+                                    .build());
+                        return builder.build();
+                      }
+                    }));
+      }
+      return queries.apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, rpcQosOptions, nameOnlyQuery);
+    }
+
+    /**
+     * A type safe builder for {@link PartitionQuery} allowing configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code .}{@link
+     * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#partitionQuery()
+     * @see FirestoreV1.PartitionQuery
+     * @see PartitionQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<PartitionQueryRequest>,
+            PCollection<RunQueryRequest>,
+            PartitionQuery,
+            FirestoreV1.PartitionQuery.Builder> {
+
+      private boolean nameOnlyQuery = false;
+
+      private Builder() {
+        super();
+      }
+
+      public Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions,
+          boolean nameOnlyQuery) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+        this.nameOnlyQuery = nameOnlyQuery;
+      }
+
+      @Override
+      public PartitionQuery build() {
+        return genericBuild();
+      }
+
+      /**
+       * Update produced queries to only retrieve their {@code __name__} thereby not retrieving any
+       * fields and reducing resource requirements.
+       *
+       * @return this builder
+       */
+      public Builder withNameOnlyQuery() {
+        this.nameOnlyQuery = true;
+        return this;
+      }
+
+      @Override
+      PartitionQuery buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new PartitionQuery(
+            clock, firestoreStatefulComponentFactory, rpcQosOptions, nameOnlyQuery);
+      }
+    }
+
+    /**
+     * DoFn which contains the logic necessary to turn a {@link PartitionQueryRequest} and {@link
+     * PartitionQueryResponse} pair into {@code N} {@link RunQueryRequest}.
+     */
+    static final class PartitionQueryResponseToRunQueryRequest
+        extends DoFn<PartitionQueryPair, RunQueryRequest> {
+
+      /**
+       * When fetching cursors that span multiple pages it is expected (per <a
+       * href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">
+       * PartitionQueryRequest.page_token</a>) for the client to sort the cursors before processing
+       * them to define the sub-queries. So here we're defining a Comparator which will sort Cursors
+       * by the first reference value present, then comparing the reference values
+       * lexicographically.
+       */
+      static final Comparator<Cursor> CURSOR_REFERENCE_VALUE_COMPARATOR;
+
+      static {
+        Function<Cursor, Optional<Value>> firstReferenceValue =
+            (Cursor c) ->
+                c.getValuesList().stream()
+                    .filter(
+                        v -> {
+                          String referenceValue = v.getReferenceValue();
+                          return referenceValue != null && !referenceValue.isEmpty();
+                        })
+                    .findFirst();
+        Function<String, String[]> stringToPath = (String s) -> s.split("/");
+        // compare references by their path segments rather than as a whole string to ensure
+        // per path segment comparison is taken into account.
+        Comparator<String[]> pathWiseCompare =
+            (String[] path1, String[] path2) -> {
+              int minLength = Math.min(path1.length, path2.length);
+              for (int i = 0; i < minLength; i++) {
+                String pathSegment1 = path1[i];
+                String pathSegment2 = path2[i];
+                int compare = pathSegment1.compareTo(pathSegment2);
+                if (compare != 0) {
+                  return compare;
+                }
+              }
+              if (path1.length == path2.length) {
+                return 0;
+              } else if (minLength == path1.length) {
+                return -1;
+              } else {
+                return 1;
+              }
+            };
+
+        // Sort those cursors which have no firstReferenceValue at the bottom of the list
+        CURSOR_REFERENCE_VALUE_COMPARATOR =
+            Comparator.comparing(
+                firstReferenceValue,
+                (o1, o2) -> {
+                  if (o1.isPresent() && o2.isPresent()) {
+                    return pathWiseCompare.compare(
+                        stringToPath.apply(o1.get().getReferenceValue()),
+                        stringToPath.apply(o2.get().getReferenceValue()));
+                  } else if (o1.isPresent()) {
+                    return -1;
+                  } else {
+                    return 1;
+                  }
+                });
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        PartitionQueryPair pair = c.element();
+        PartitionQueryRequest partitionQueryRequest = pair.getRequest();
+        String dbRoot = partitionQueryRequest.getParent();
+        StructuredQuery structuredQuery = partitionQueryRequest.getStructuredQuery();
+        PartitionQueryResponse partitionQueryResponse = pair.getResponse();
+        // create a new list before we sort things
+        List<Cursor> cursors = new ArrayList<>(partitionQueryResponse.getPartitionsList());
+        cursors.sort(CURSOR_REFERENCE_VALUE_COMPARATOR);
+        final int size = cursors.size();
+        final int lastIdx = size - 1;
+        for (int i = 0; i < size; i++) {
+          Cursor curr = cursors.get(i);
+
+          if (i == 0) {
+            // first cursor, emit a range of everything up to the current cursor
+            emit(c, dbRoot, structuredQuery.toBuilder().setEndAt(curr));
+          }
+
+          if (0 < i && i <= lastIdx) {
+            Cursor prev = cursors.get(i - 1);
+            // emit a range for values between prev and curr
+            emit(c, dbRoot, structuredQuery.toBuilder().setStartAt(prev).setEndAt(curr));
+          }
+
+          if (i == lastIdx) {
+            // last cursor, emit a range of everything from the current cursor onward
+            emit(c, dbRoot, structuredQuery.toBuilder().setStartAt(curr));
+          }
+        }
+      }
+
+      private void emit(ProcessContext c, String dbRoot, StructuredQuery.Builder builder) {
+        RunQueryRequest runQueryRequest =
+            RunQueryRequest.newBuilder()
+                .setParent(dbRoot)
+                .setStructuredQuery(builder.build())
+                .build();
+        c.output(runQueryRequest);
+      }
+    }
+  }
+
+  /** DoFn to output CollectionIds from a {@link ListCollectionIdsResponse}. */
+  private static final class FlattenListCollectionIdsResponse
+      extends DoFn<ListCollectionIdsResponse, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.element().getCollectionIdsList().forEach(c::output);
+    }
+  }
+
+  /** DoFn to output {@link Document}s from a {@link ListDocumentsResponse}. */
+  private static final class ListDocumentsResponseToDocument
+      extends DoFn<ListDocumentsResponse, Document> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.element().getDocumentsList().forEach(c::output);
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
    * com.google.firestore.v1.Write}{@code >, }{@link PDone}{@code >} which will write to Firestore.
    *
    * <p>If an error is encountered while trying to write to Cloud Firestore a {@link
@@ -626,6 +1706,7 @@ public final class FirestoreV1 {
         OutT extends POutput,
         TrfmT extends Transform<InT, OutT, TrfmT, BldrT>,
         BldrT extends Transform.Builder<InT, OutT, TrfmT, BldrT>> {
+
       JodaClock clock;
       FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
       RpcQosOptions rpcQosOptions;
@@ -654,7 +1735,7 @@ public final class FirestoreV1 {
        * @return Down cast this
        */
       @SuppressWarnings({"unchecked", "RedundantSuppression"})
-      private BldrT self() {
+      BldrT self() {
         return (BldrT) this;
       }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
new file mode 100644
index 0000000..bfbc636
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ImplicitlyWindowedFirestoreDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request, PartitionQueryResponse.@Nullable Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {
+        String docName = documentsList.get(i);
+        if (docName.equals(missing) || docName.equals(foundName)) {
+          startIndex = i;
+          break;
+        }
+      }
+      if (0 <= startIndex) {
+        BatchGetDocumentsRequest.Builder builder = originalRequest.toBuilder().clearDocuments();
+        documentsList.stream()
+            .skip(startIndex + 1) // start from the next entry from the one we found
+            .forEach(builder::addDocuments);
+        return builder.build();
+      }
+      throw new IllegalStateException(
+          String.format(
+              "Unable to determine BatchGet resumption point. Most recently received doc __name__ '%s'",
+              foundName != null ? foundName : missing));
+    }
+  }
+
+  /**
+   * {@link DoFn} Providing support for a Read type RPC operation which uses a Stream rather than
+   * pagination. Each response from the stream will be output to the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   *
+   * @param <InT> Request type
+   * @param <OutT> Response type
+   */
+  private abstract static class StreamingFirestoreV1ReadFn<
+          InT extends Message, OutT extends Message>
+      extends BaseFirestoreV1ReadFn<InT, OutT> {
+
+    protected StreamingFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    protected abstract ServerStreamingCallable<InT, OutT> getCallable(FirestoreStub firestoreStub);
+
+    protected abstract InT setStartFrom(InT element, OutT out);
+
+    @Override
+    public final void processElement(ProcessContext c) throws Exception {
+      @SuppressWarnings(
+          "nullness") // for some reason requireNonNull thinks its parameter but be non-null...
+      final InT element = requireNonNull(c.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      OutT lastReceivedValue = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        Instant start = clock.instant();
+        try {
+          InT request =
+              lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
+          attempt.recordRequestStart(start);
+          ServerStream<OutT> serverStream = getCallable(firestoreStub).call(request);
+          attempt.recordRequestSuccessful(clock.instant());
+          for (OutT out : serverStream) {
+            lastReceivedValue = out;
+            attempt.recordStreamValue(clock.instant());
+            c.output(out);
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} Providing support for a Read type RPC operation which uses pagination rather than
+   * a Stream.
+   *
+   * @param <RequestT> Request type
+   * @param <ResponseT> Response type
+   */
+  @SuppressWarnings({
+    // errorchecker doesn't like the second ? on PagedResponse, seemingly because of different
+    // recursion depth limits; 3 on the found vs 4 on the required.
+    // The second ? is the type of collection the paged response uses to hold all responses if
+    // trying to expand all pages to a single collection. We are emitting a single page at at time
+    // while tracking read progress so we can resume if an error has occurred and we still have
+    // attempt budget available.
+    "type.argument.type.incompatible"
+  })
+  private abstract static class PaginatedFirestoreV1ReadFn<
+          RequestT extends Message,
+          PagedResponseT extends AbstractPagedListResponse<RequestT, ResponseT, ?, PageT, ?>,
+          PageT extends AbstractPage<RequestT, ResponseT, ?, PageT>,
+          ResponseT extends Message>
+      extends BaseFirestoreV1ReadFn<RequestT, ResponseT> {
+
+    protected PaginatedFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    protected abstract UnaryCallable<RequestT, PagedResponseT> getCallable(
+        FirestoreStub firestoreStub);
+
+    protected abstract RequestT setPageToken(RequestT request, String nextPageToken);
+
+    @Override
+    public final void processElement(ProcessContext c) throws Exception {
+      @SuppressWarnings(
+          "nullness") // for some reason requireNonNull thinks its parameter but be non-null...
+      final RequestT element = requireNonNull(c.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      String nextPageToken = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          RequestT request = nextPageToken == null ? element : setPageToken(element, nextPageToken);
+          attempt.recordRequestStart(clock.instant());
+          PagedResponseT pagedResponse = getCallable(firestoreStub).call(request);
+          for (PageT page : pagedResponse.iteratePages()) {
+            ResponseT response = page.getResponse();
+            attempt.recordRequestSuccessful(clock.instant());
+            c.output(response);
+            if (page.hasNextPage()) {
+              nextPageToken = page.getNextPageToken();
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+    }
+  }
+
+  /**
+   * Base class for all {@link org.apache.beam.sdk.transforms.DoFn DoFn}s which provide access to
+   * RPCs from the Cloud Firestore V1 API.
+   *
+   * <p>This class takes care of common lifecycle elements and transient state management for
+   * subclasses allowing subclasses to provide the minimal implementation for {@link
+   * ImplicitlyWindowedFirestoreDoFn#processElement(DoFn.ProcessContext)}}
+   *
+   * @param <InT> The type of element coming into this {@link DoFn}
+   * @param <OutT> The type of element output from this {@link DoFn}
+   */
+  abstract static class BaseFirestoreV1ReadFn<InT, OutT>
+      extends ImplicitlyWindowedFirestoreDoFn<InT, OutT> implements HasRpcAttemptContext {
+
+    protected final JodaClock clock;
+    protected final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
+    protected final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible checkpointing
+    protected transient FirestoreStub firestoreStub;
+    protected transient RpcQos rpcQos;
+    protected transient String projectId;
+
+    @SuppressWarnings(
+        "initialization.fields.uninitialized") // allow transient fields to be managed by component
+    // lifecycle
+    protected BaseFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      this.clock = requireNonNull(clock, "clock must be non null");
+      this.firestoreStatefulComponentFactory =
+          requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null");
+      this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId =
+          requireNonNull(project, "project must be defined on GcpOptions of PipelineOptions");
+      firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("nullness") // allow clearing transient fields
+    @Override
+    public void finishBundle() throws Exception {
+      projectId = null;
+      firestoreStub.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final void populateDisplayData(DisplayData.Builder builder) {
+      builder.include("rpcQosOptions", rpcQosOptions);
+    }
+  }
+
+  /**
+   * Tuple class for a PartitionQuery Request and Response Pair.
+   *
+   * <p>When processing the response of a ParitionQuery it only is useful in the context of the
+   * original request as the cursors from the response are tied to the index resolved from the
+   * request. This class ties these two together so that they can be passed along the pipeline
+   * together.
+   */
+  static final class PartitionQueryPair implements Serializable {
+    private final PartitionQueryRequest request;
+    private final PartitionQueryResponse response;
+
+    @VisibleForTesting
+    PartitionQueryPair(PartitionQueryRequest request, PartitionQueryResponse response) {
+      this.request = request;
+      this.response = response;
+    }
+
+    public PartitionQueryRequest getRequest() {
+      return request;
+    }
+
+    public PartitionQueryResponse getResponse() {
+      return response;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof PartitionQueryPair)) {
+        return false;
+      }
+      PartitionQueryPair that = (PartitionQueryPair) o;
+      return request.equals(that.request) && response.equals(that.response);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(request, response);
+    }
+
+    @Override
+    public String toString() {
+      return "PartitionQueryPair{" + "request=" + request + ", response=" + response + '}';
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1Fn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1RpcAttemptContexts.java
similarity index 92%
rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1Fn.java
rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1RpcAttemptContexts.java
index 81c3f21..0a75eeb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1Fn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1RpcAttemptContexts.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.io.gcp.firestore;
 
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
 
-final class FirestoreV1Fn {
+final class FirestoreV1RpcAttemptContexts {
 
   /**
    * The base namespace used for {@link Context RpcAttempt.Context} values in {@link
@@ -42,8 +42,12 @@ final class FirestoreV1Fn {
    * cycle.
    */
   enum V1FnRpcAttemptContext implements Context {
+    BatchGetDocuments(),
     BatchWrite(),
-    ;
+    ListCollectionIds(),
+    ListDocuments(),
+    PartitionQuery(),
+    RunQuery();
 
     private final String namespace;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
index e96d490..bbf4060 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
@@ -36,12 +36,12 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.WindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ExplicitlyWindowedFirestoreDoFn;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.FailedWritesException;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteSuccessSummary;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.V1FnRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
@@ -137,10 +137,11 @@ final class FirestoreV1WriteFn {
    * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
    * the lifecycle of this Fn.
    */
-  abstract static class BaseBatchWriteFn<OutT> extends WindowAwareDoFn<Write, OutT>
+  abstract static class BaseBatchWriteFn<OutT> extends ExplicitlyWindowedFirestoreDoFn<Write, OutT>
       implements HasRpcAttemptContext {
     private static final Logger LOG =
-        LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+        LoggerFactory.getLogger(
+            FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite.getNamespace());
     private final JodaClock clock;
     private final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
     private final RpcQosOptions rpcQosOptions;
@@ -378,7 +379,7 @@ final class FirestoreV1WriteFn {
           attempt.recordRequestFailed(end);
           attempt.recordWriteCounts(end, 0, writesCount);
           flushBuffer.forEach(writes::offer);
-          attempt.checkCanRetry(exception);
+          attempt.checkCanRetry(end, exception);
           continue;
         }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java
index cafb551..dca12db 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java
@@ -48,7 +48,8 @@ import org.joda.time.Instant;
  *   <li>Record start of trying to send request
  *   <li>Record success or failure state of send request attempt
  *   <li>If success output all returned responses
- *   <li>If failure check retry ability ({@link RpcAttempt#checkCanRetry(RuntimeException)})
+ *   <li>If failure check retry ability ({@link RpcAttempt#checkCanRetry(Instant,
+ *       RuntimeException)})
  *       <ol style="margin-top: 0">
  *         <li>Ensure the request has budget to retry ({@link RpcQosOptions#getMaxAttempts()})
  *         <li>Ensure the error is not a non-retryable error
@@ -125,7 +126,7 @@ interface RpcQos {
     boolean awaitSafeToProceed(Instant start) throws InterruptedException;
 
     /**
-     * Determine if an rpc can be retried given {@code exception}.
+     * Determine if an rpc can be retried given {@code instant} and {@code exception}.
      *
      * <p>If a backoff is necessary before retrying this method can block for backoff before
      * returning.
@@ -133,10 +134,11 @@ interface RpcQos {
      * <p>If no retry is available this {@link RpcAttempt} will move to a terminal failed state and
      * will error if further interaction is attempted.
      *
+     * @param instant The instant with which to evaluate retryability
      * @param exception Exception to evaluate for retry ability
      * @throws InterruptedException if this thread is interrupted while waiting
      */
-    void checkCanRetry(RuntimeException exception) throws InterruptedException;
+    void checkCanRetry(Instant instant, RuntimeException exception) throws InterruptedException;
 
     /**
      * Mark this {@link RpcAttempt} as having completed successfully, moving to a terminal success
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
index 9d59a94..f91f18c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
@@ -21,29 +21,33 @@ import com.google.api.gax.grpc.GrpcStatusCode;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.StatusCode;
 import com.google.rpc.Code;
-import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffDuration;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffResult;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffResults;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.MovingFunction;
 import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.Interval;
@@ -79,7 +83,6 @@ final class RpcQosImpl implements RpcQos {
   private final AdaptiveThrottler at;
   private final WriteBatcher wb;
   private final WriteRampUp writeRampUp;
-  private final FluentBackoff fb;
 
   private final WeakHashMap<Context, O11y> counters;
   private final Random random;
@@ -115,11 +118,6 @@ final class RpcQosImpl implements RpcQos {
             filteringDistributionFactory);
     writeRampUp =
         new WriteRampUp(500.0 / options.getHintMaxNumWorkers(), filteringDistributionFactory);
-    // maxRetries is an inclusive value, we want exclusive since we are tracking all attempts
-    fb =
-        FluentBackoff.DEFAULT
-            .withMaxRetries(options.getMaxAttempts() - 1)
-            .withInitialBackoff(options.getInitialBackoff());
     counters = new WeakHashMap<>();
     computeCounters = (Context c) -> O11y.create(c, counterFactory, filteringDistributionFactory);
   }
@@ -127,13 +125,36 @@ final class RpcQosImpl implements RpcQos {
   @Override
   public RpcWriteAttemptImpl newWriteAttempt(Context context) {
     return new RpcWriteAttemptImpl(
-        context, counters.computeIfAbsent(context, computeCounters), fb.backoff(), sleeper);
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        new StatusCodeAwareBackoff(
+            random,
+            options.getMaxAttempts(),
+            options.getThrottleDuration(),
+            Collections.emptySet()),
+        sleeper);
   }
 
   @Override
   public RpcReadAttemptImpl newReadAttempt(Context context) {
+    Set<Integer> graceStatusCodeNumbers = Collections.emptySet();
+    // When reading results from a RunQuery or BatchGet the stream returning the results has a
+    //   maximum lifetime of 60 seconds at which point it will be broken with an UNAVAILABLE
+    //   status code. Since this is expected for semi-large query result set sizes we specify
+    //   it as a grace value for backoff evaluation.
+    if (V1FnRpcAttemptContext.RunQuery.equals(context)
+        || V1FnRpcAttemptContext.BatchGetDocuments.equals(context)) {
+      graceStatusCodeNumbers = ImmutableSet.of(Code.UNAVAILABLE_VALUE);
+    }
     return new RpcReadAttemptImpl(
-        context, counters.computeIfAbsent(context, computeCounters), fb.backoff(), sleeper);
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        new StatusCodeAwareBackoff(
+            random,
+            options.getMaxAttempts(),
+            options.getThrottleDuration(),
+            graceStatusCodeNumbers),
+        sleeper);
   }
 
   @Override
@@ -187,7 +208,7 @@ final class RpcQosImpl implements RpcQos {
   private abstract class BaseRpcAttempt implements RpcAttempt {
     private final Logger logger;
     final O11y o11y;
-    final BackOff backoff;
+    final StatusCodeAwareBackoff backoff;
     final Sleeper sleeper;
 
     AttemptState state;
@@ -196,7 +217,7 @@ final class RpcQosImpl implements RpcQos {
     @SuppressWarnings(
         "initialization.fields.uninitialized") // allow transient fields to be managed by component
     // lifecycle
-    BaseRpcAttempt(Context context, O11y o11y, BackOff backoff, Sleeper sleeper) {
+    BaseRpcAttempt(Context context, O11y o11y, StatusCodeAwareBackoff backoff, Sleeper sleeper) {
       this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos", context.getNamespace()));
       this.o11y = o11y;
       this.backoff = backoff;
@@ -219,7 +240,8 @@ final class RpcQosImpl implements RpcQos {
     }
 
     @Override
-    public void checkCanRetry(RuntimeException exception) throws InterruptedException {
+    public void checkCanRetry(Instant instant, RuntimeException exception)
+        throws InterruptedException {
       state.checkActive();
 
       Optional<ApiException> findApiException = findApiException(exception);
@@ -229,10 +251,9 @@ final class RpcQosImpl implements RpcQos {
         // order here is semi-important
         // First we always want to test if the error code is one of the codes we have deemed
         // non-retryable before delegating to the exceptions default set.
-        if (maxAttemptsExhausted()
-            || getStatusCodeNumber(apiException)
-                .map(NON_RETRYABLE_ERROR_NUMBERS::contains)
-                .orElse(false)
+        Optional<Integer> statusCodeNumber = getStatusCodeNumber(apiException);
+        if (maxAttemptsExhausted(instant, statusCodeNumber.orElse(Code.UNKNOWN_VALUE))
+            || statusCodeNumber.map(NON_RETRYABLE_ERROR_NUMBERS::contains).orElse(false)
             || !apiException.isRetryable()) {
           state = AttemptState.COMPLETE_ERROR;
           throw apiException;
@@ -259,7 +280,7 @@ final class RpcQosImpl implements RpcQos {
       state.checkStarted();
       o11y.rpcSuccesses.inc();
       o11y.rpcDurationMs.update(durationMs(end));
-      at.recordSuccessfulRequest(start);
+      at.recordRequestSuccessful(start);
     }
 
     @Override
@@ -267,21 +288,21 @@ final class RpcQosImpl implements RpcQos {
       state.checkStarted();
       o11y.rpcFailures.inc();
       o11y.rpcDurationMs.update(durationMs(end));
-      at.recordFailedRequest(start);
+      at.recordRequestFailed(start);
     }
 
-    private boolean maxAttemptsExhausted() throws InterruptedException {
-      try {
-        boolean exhausted = !BackOffUtils.next(sleeper, backoff);
-        if (exhausted) {
-          logger.error("Max attempts exhausted after {} attempts.", options.getMaxAttempts());
-        }
-        return exhausted;
-      } catch (IOException e) {
-        // We are using FluentBackoff which does not ever throw an IOException from its methods
-        // Catch and wrap any potential IOException as a RuntimeException since it won't ever
-        // happen unless the implementation of FluentBackoff changes.
-        throw new RuntimeException(e);
+    private boolean maxAttemptsExhausted(Instant now, int statusCodeNumber)
+        throws InterruptedException {
+      BackoffResult backoffResult = backoff.nextBackoff(now, statusCodeNumber);
+      if (BackoffResults.EXHAUSTED.equals(backoffResult)) {
+        logger.error("Max attempts exhausted after {} attempts.", options.getMaxAttempts());
+        return true;
+      } else if (backoffResult instanceof BackoffDuration) {
+        BackoffDuration result = (BackoffDuration) backoffResult;
+        sleeper.sleep(result.getDuration().getMillis());
+        return false;
+      } else {
+        return false;
       }
     }
 
@@ -323,13 +344,14 @@ final class RpcQosImpl implements RpcQos {
   }
 
   private final class RpcReadAttemptImpl extends BaseRpcAttempt implements RpcReadAttempt {
-    private RpcReadAttemptImpl(Context context, O11y o11y, BackOff backoff, Sleeper sleeper) {
+    private RpcReadAttemptImpl(
+        Context context, O11y o11y, StatusCodeAwareBackoff backoff, Sleeper sleeper) {
       super(context, o11y, backoff, sleeper);
     }
 
     @Override
     public void recordRequestStart(Instant start) {
-      at.recordStartRequest(start);
+      at.recordRequestStart(start);
       this.start = start;
       state = AttemptState.STARTED;
     }
@@ -343,7 +365,8 @@ final class RpcQosImpl implements RpcQos {
 
   final class RpcWriteAttemptImpl extends BaseRpcAttempt implements RpcWriteAttempt {
 
-    private RpcWriteAttemptImpl(Context context, O11y o11y, BackOff backoff, Sleeper sleeper) {
+    private RpcWriteAttemptImpl(
+        Context context, O11y o11y, StatusCodeAwareBackoff backoff, Sleeper sleeper) {
       super(context, o11y, backoff, sleeper);
     }
 
@@ -379,7 +402,7 @@ final class RpcQosImpl implements RpcQos {
 
     @Override
     public void recordRequestStart(Instant start, int numWrites) {
-      at.recordStartRequest(start, numWrites);
+      at.recordRequestStart(start, numWrites);
       writeRampUp.recordWriteCount(start, numWrites);
       this.start = start;
       state = AttemptState.STARTED;
@@ -391,10 +414,10 @@ final class RpcQosImpl implements RpcQos {
       state.checkStarted();
       wb.recordRequestLatency(start, end, totalWrites, o11y.latencyPerDocumentMs);
       if (successfulWrites > 0) {
-        at.recordSuccessfulRequest(start, successfulWrites);
+        at.recordRequestSuccessful(start, successfulWrites);
       }
       if (failedWrites > 0) {
-        at.recordFailedRequest(start, failedWrites);
+        at.recordRequestFailed(start, failedWrites);
       }
     }
   }
@@ -511,27 +534,27 @@ final class RpcQosImpl implements RpcQos {
       }
     }
 
-    private void recordStartRequest(Instant instantSinceEpoch) {
-      recordStartRequest(instantSinceEpoch, 1);
+    private void recordRequestStart(Instant instantSinceEpoch) {
+      recordRequestStart(instantSinceEpoch, 1);
     }
 
-    private void recordStartRequest(Instant instantSinceEpoch, int value) {
+    private void recordRequestStart(Instant instantSinceEpoch, int value) {
       allRequestsMovingFunction.add(instantSinceEpoch.getMillis(), value);
     }
 
-    private void recordSuccessfulRequest(Instant instantSinceEpoch) {
-      recordSuccessfulRequest(instantSinceEpoch, 1);
+    private void recordRequestSuccessful(Instant instantSinceEpoch) {
+      recordRequestSuccessful(instantSinceEpoch, 1);
     }
 
-    private void recordSuccessfulRequest(Instant instantSinceEpoch, int value) {
+    private void recordRequestSuccessful(Instant instantSinceEpoch, int value) {
       successfulRequestsMovingFunction.add(instantSinceEpoch.getMillis(), value);
     }
 
-    private void recordFailedRequest(Instant instantSinceEpoch) {
-      recordFailedRequest(instantSinceEpoch, 1);
+    private void recordRequestFailed(Instant instantSinceEpoch) {
+      recordRequestFailed(instantSinceEpoch, 1);
     }
 
-    private void recordFailedRequest(Instant instantSinceEpoch, int value) {
+    private void recordRequestFailed(Instant instantSinceEpoch, int value) {
       failedRequestsMovingFunction.add(instantSinceEpoch.getMillis(), value);
     }
 
@@ -700,6 +723,142 @@ final class RpcQosImpl implements RpcQos {
     }
   }
 
+  /**
+   * This class implements a backoff algorithm similar to that of {@link
+   * org.apache.beam.sdk.util.FluentBackoff} with some key differences:
+   *
+   * <ol>
+   *   <li>A set of status code numbers may be specified to have a graceful evaluation
+   *   <li>Gracefully evaluated status code numbers will increment a decaying counter to ensure if
+   *       the graceful status code numbers occur more than once in the previous 60 seconds the
+   *       regular backoff behavior will kick in.
+   *   <li>The random number generator used to induce jitter is provided via constructor parameter
+   *       rather than using {@link Math#random()}}
+   * </ol>
+   *
+   * The primary motivation for creating this implementation is to support streamed responses from
+   * Firestore. In the case of RunQuery and BatchGet the results are returned via stream. The result
+   * stream has a maximum lifetime of 60 seconds before it will be broken and an UNAVAILABLE status
+   * code will be raised. Give that UNAVAILABLE is expected for streams, this class allows for
+   * defining a set of status code numbers which are given a grace count of 1 before backoff kicks
+   * in. When backoff does kick in, it is implemented using the same calculations as {@link
+   * org.apache.beam.sdk.util.FluentBackoff}.
+   */
+  static final class StatusCodeAwareBackoff {
+    private static final double RANDOMIZATION_FACTOR = 0.5;
+    private static final Duration MAX_BACKOFF = Duration.standardMinutes(1);
+    private static final Duration MAX_CUMULATIVE_BACKOFF = Duration.standardMinutes(1);
+
+    private final Random rand;
+    private final int maxAttempts;
+    private final Duration initialBackoff;
+    private final Set<Integer> graceStatusCodeNumbers;
+    private final MovingFunction graceStatusCodeTracker;
+
+    private Duration cumulativeBackoff;
+    private int attempt;
+
+    StatusCodeAwareBackoff(
+        Random rand,
+        int maxAttempts,
+        Duration throttleDuration,
+        Set<Integer> graceStatusCodeNumbers) {
+      this.rand = rand;
+      this.graceStatusCodeNumbers = graceStatusCodeNumbers;
+      this.maxAttempts = maxAttempts;
+      this.initialBackoff = throttleDuration;
+      this.graceStatusCodeTracker = createGraceStatusCodeTracker();
+      this.cumulativeBackoff = Duration.ZERO;
+      this.attempt = 1;
+    }
+
+    BackoffResult nextBackoff(Instant now, int statusCodeNumber) {
+      if (graceStatusCodeNumbers.contains(statusCodeNumber)) {
+        long nowMillis = now.getMillis();
+        long numGraceStatusCode = graceStatusCodeTracker.get(nowMillis);
+        graceStatusCodeTracker.add(nowMillis, 1);
+        if (numGraceStatusCode < 1) {
+          return BackoffResults.NONE;
+        } else {
+          return doBackoff();
+        }
+      } else {
+        return doBackoff();
+      }
+    }
+
+    private BackoffResult doBackoff() {
+      // Maximum number of retries reached.
+      if (attempt >= maxAttempts) {
+        return BackoffResults.EXHAUSTED;
+      }
+      // Maximum cumulative backoff reached.
+      if (cumulativeBackoff.compareTo(MAX_CUMULATIVE_BACKOFF) >= 0) {
+        return BackoffResults.EXHAUSTED;
+      }
+
+      double currentIntervalMillis =
+          Math.min(
+              initialBackoff.getMillis() * Math.pow(1.5, attempt - 1), MAX_BACKOFF.getMillis());
+      double randomOffset =
+          (rand.nextDouble() * 2 - 1) * RANDOMIZATION_FACTOR * currentIntervalMillis;
+      long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset);
+      // Cap to limit on cumulative backoff
+      Duration remainingCumulative = MAX_CUMULATIVE_BACKOFF.minus(cumulativeBackoff);
+      nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis());
+
+      // Update state and return backoff.
+      cumulativeBackoff = cumulativeBackoff.plus(nextBackoffMillis);
+      attempt += 1;
+      return new BackoffDuration(Duration.millis(nextBackoffMillis));
+    }
+
+    private static MovingFunction createGraceStatusCodeTracker() {
+      return createMovingFunction(Duration.standardMinutes(1), Duration.millis(500));
+    }
+
+    interface BackoffResult {}
+
+    enum BackoffResults implements BackoffResult {
+      EXHAUSTED,
+      NONE
+    }
+
+    static final class BackoffDuration implements BackoffResult {
+      private final Duration duration;
+
+      BackoffDuration(Duration duration) {
+        this.duration = duration;
+      }
+
+      Duration getDuration() {
+        return duration;
+      }
+
+      @Override
+      public boolean equals(@Nullable Object o) {
+        if (this == o) {
+          return true;
+        }
+        if (!(o instanceof BackoffDuration)) {
+          return false;
+        }
+        BackoffDuration that = (BackoffDuration) o;
+        return Objects.equals(duration, that.duration);
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hash(duration);
+      }
+
+      @Override
+      public String toString() {
+        return "BackoffDuration{" + "duration=" + duration + '}';
+      }
+    }
+  }
+
   private static class MovingAverage {
     private final MovingFunction sum;
     private final MovingFunction count;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosOptions.java
index 3582151..f228bea 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosOptions.java
@@ -149,7 +149,7 @@ public final class RpcQosOptions implements Serializable, HasDisplayData {
    *
    * <p><i>Default Value:</i> 5 sec
    *
-   * @see RpcQosOptions#getInitialBackoff()
+   * @see RpcQosOptions.Builder#withInitialBackoff(Duration)
    */
   public Duration getInitialBackoff() {
     return initialBackoff;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1FnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1FnTest.java
index 78ba534..39e0db3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1FnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1FnTest.java
@@ -26,8 +26,8 @@ import com.google.api.gax.rpc.ApiExceptionFactory;
 import com.google.cloud.firestore.v1.stub.FirestoreStub;
 import io.grpc.Status.Code;
 import java.net.SocketTimeoutException;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.V1FnRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -62,11 +62,36 @@ abstract class BaseFirestoreV1FnTest<
     if (rpcAttemptContext instanceof V1FnRpcAttemptContext) {
       V1FnRpcAttemptContext v1FnRpcAttemptContext = (V1FnRpcAttemptContext) rpcAttemptContext;
       switch (v1FnRpcAttemptContext) {
+        case BatchGetDocuments:
+          assertEquals(
+              "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchGetDocuments",
+              v1FnRpcAttemptContext.getNamespace());
+          break;
         case BatchWrite:
           assertEquals(
               "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.BatchWrite",
               v1FnRpcAttemptContext.getNamespace());
           break;
+        case ListCollectionIds:
+          assertEquals(
+              "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListCollectionIds",
+              v1FnRpcAttemptContext.getNamespace());
+          break;
+        case ListDocuments:
+          assertEquals(
+              "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.ListDocuments",
+              v1FnRpcAttemptContext.getNamespace());
+          break;
+        case PartitionQuery:
+          assertEquals(
+              "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.PartitionQuery",
+              v1FnRpcAttemptContext.getNamespace());
+          break;
+        case RunQuery:
+          assertEquals(
+              "org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.RunQuery",
+              v1FnRpcAttemptContext.getNamespace());
+          break;
         default:
           fail("Unverified V1FnRpcAttemptContext value");
       }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java
new file mode 100644
index 0000000..0aab59d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.BaseFirestoreV1ReadFn;
+import org.junit.Test;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+abstract class BaseFirestoreV1ReadFnTest<InT, OutT>
+    extends BaseFirestoreV1FnTest<InT, OutT, BaseFirestoreV1ReadFn<InT, OutT>> {
+
+  @Mock protected RpcQos.RpcReadAttempt attempt;
+
+  @Override
+  @Test
+  public final void attemptsExhaustedForRetryableError() throws Exception {
+    BaseFirestoreV1ReadFn<InT, OutT> fn = getFn(clock, ff, rpcQosOptions);
+    V1RpcFnTestCtx ctx = newCtx();
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt);
+    ctx.mockRpcToCallable(stub);
+
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true, true, true);
+    ctx.whenCallableCall(any(), RETRYABLE_ERROR, RETRYABLE_ERROR, RETRYABLE_ERROR);
+    doNothing().when(attempt).recordRequestFailed(any());
+    doNothing()
+        .doNothing()
+        .doThrow(RETRYABLE_ERROR)
+        .when(attempt)
+        .checkCanRetry(any(), eq(RETRYABLE_ERROR));
+
+    when(processContext.element()).thenReturn(ctx.getRequest());
+
+    try {
+      runFunction(fn);
+      fail("Expected ApiException to be throw after exhausted attempts");
+    } catch (ApiException e) {
+      assertSame(RETRYABLE_ERROR, e);
+    }
+
+    verify(attempt, times(3)).awaitSafeToProceed(any());
+    verify(attempt, times(3)).recordRequestFailed(any());
+    verify(attempt, times(0)).recordStreamValue(any());
+    verify(attempt, times(0)).recordRequestSuccessful(any());
+  }
+
+  @Override
+  @Test
+  public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
+    BaseFirestoreV1ReadFn<InT, OutT> fn = getFn(clock, ff, rpcQosOptions);
+    V1RpcFnTestCtx ctx = newCtx();
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt);
+
+    InterruptedException interruptedException = new InterruptedException();
+    when(attempt.awaitSafeToProceed(any())).thenReturn(false).thenThrow(interruptedException);
+
+    when(processContext.element()).thenReturn(ctx.getRequest());
+
+    try {
+      runFunction(fn);
+      fail("Expected ApiException to be throw after exhausted attempts");
+    } catch (InterruptedException e) {
+      assertSame(interruptedException, e);
+    }
+
+    verify(stub, times(1)).close();
+    verifyNoMoreInteractions(stub);
+    ctx.verifyNoInteractionsWithCallable();
+    verify(attempt, times(0)).recordRequestFailed(any());
+    verify(attempt, times(0)).recordStreamValue(any());
+    verify(attempt, times(0)).recordRequestSuccessful(any());
+  }
+
+  @Test
+  public abstract void resumeFromLastReadValue() throws Exception;
+
+  protected abstract V1RpcFnTestCtx newCtx();
+
+  @Override
+  protected final BaseFirestoreV1ReadFn<InT, OutT> getFn() {
+    return getFn(JodaClock.DEFAULT, FirestoreStatefulComponentFactory.INSTANCE, rpcQosOptions);
+  }
+
+  protected abstract BaseFirestoreV1ReadFn<InT, OutT> getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions);
+
+  @Override
+  protected void processElementsAndFinishBundle(
+      BaseFirestoreV1ReadFn<InT, OutT> fn, int processElementCount) throws Exception {
+    try {
+      for (int i = 0; i < processElementCount; i++) {
+        fn.processElement(processContext);
+      }
+    } finally {
+      fn.finishBundle();
+    }
+  }
+
+  /**
+   * This class exists due to the fact that there is not a common parent interface in gax for each
+   * of the types of Callables that are output upon code generation.
+   */
+  protected abstract class V1RpcFnTestCtx {
+
+    protected V1RpcFnTestCtx() {}
+
+    public abstract InT getRequest();
+
+    public abstract void mockRpcToCallable(FirestoreStub stub);
+
+    public abstract void whenCallableCall(InT in, Throwable... throwables);
+
+    public abstract void verifyNoInteractionsWithCallable();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
index a68686e..727fdc0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
@@ -59,7 +59,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
-import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.WriteElement;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
@@ -131,7 +131,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
 
     when(ff.getFirestoreStub(any())).thenReturn(stub);
     when(ff.getRpcQos(any())).thenReturn(rpcQos);
-    when(rpcQos.newWriteAttempt(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite))
+    when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
         .thenReturn(attempt);
     when(stub.batchWriteCallable()).thenReturn(callable);
 
@@ -145,7 +145,11 @@ public abstract class BaseFirestoreV1WriteFnTest<
 
     when(callable.call(any())).thenThrow(RETRYABLE_ERROR, RETRYABLE_ERROR, RETRYABLE_ERROR);
     doNothing().when(attempt).recordWriteCounts(any(), anyInt(), anyInt());
-    doNothing().doNothing().doThrow(RETRYABLE_ERROR).when(attempt).checkCanRetry(RETRYABLE_ERROR);
+    doNothing()
+        .doNothing()
+        .doThrow(RETRYABLE_ERROR)
+        .when(attempt)
+        .checkCanRetry(any(), eq(RETRYABLE_ERROR));
 
     when(processContext.element()).thenReturn(write);
 
@@ -173,7 +177,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
   public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
     when(ff.getFirestoreStub(any())).thenReturn(stub);
     when(ff.getRpcQos(any())).thenReturn(rpcQos);
-    when(rpcQos.newWriteAttempt(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite))
+    when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
         .thenReturn(attempt);
 
     InterruptedException interruptedException = new InterruptedException();
@@ -233,7 +237,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     verify(attempt, times(1)).recordRequestStart(rpcStart, 1);
     verify(attempt, times(1)).recordWriteCounts(rpcEnd, 1, 0);
     verify(attempt, never()).recordWriteCounts(any(), anyInt(), gt(0));
-    verify(attempt, never()).checkCanRetry(any());
+    verify(attempt, never()).checkCanRetry(any(), any());
   }
 
   @Test
@@ -269,9 +273,9 @@ public abstract class BaseFirestoreV1WriteFnTest<
     when(flushBuffer.iterator()).thenReturn(newArrayList(element1).iterator());
     when(flushBuffer.getBufferedElementsCount()).thenReturn(1);
     when(callable.call(any())).thenThrow(err1, err2, err3);
-    doNothing().when(attempt).checkCanRetry(err1);
-    doNothing().when(attempt).checkCanRetry(err2);
-    doThrow(err3).when(attempt).checkCanRetry(err3);
+    doNothing().when(attempt).checkCanRetry(any(), eq(err1));
+    doNothing().when(attempt).checkCanRetry(any(), eq(err2));
+    doThrow(err3).when(attempt).checkCanRetry(any(), eq(err3));
 
     try {
       FnT fn = getFn(clock, ff, rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
@@ -344,7 +348,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     verify(finishBundleAttempt, times(1)).recordRequestStart(rpc1Start, 1);
     verify(finishBundleAttempt, times(1)).recordWriteCounts(rpc1End, 1, 0);
     verify(finishBundleAttempt, times(1)).completeSuccess();
-    verify(finishBundleAttempt, never()).checkCanRetry(any());
+    verify(finishBundleAttempt, never()).checkCanRetry(any(), any());
   }
 
   @Test
@@ -682,7 +686,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
 
     assertEquals(1, fn.writes.size());
     verify(attempt, never()).recordWriteCounts(any(), anyInt(), anyInt());
-    verify(attempt, never()).checkCanRetry(any());
+    verify(attempt, never()).checkCanRetry(any(), any());
     verify(attempt, never()).completeSuccess();
 
     Instant attempt2RpcStart = Instant.ofEpochMilli(2);
@@ -698,7 +702,7 @@ public abstract class BaseFirestoreV1WriteFnTest<
     verify(attempt2, times(1)).recordRequestStart(attempt2RpcStart, 1);
     verify(attempt2, times(1)).recordWriteCounts(attempt2RpcEnd, 1, 0);
     verify(attempt2, never()).recordWriteCounts(any(), anyInt(), gt(0));
-    verify(attempt2, never()).checkCanRetry(any());
+    verify(attempt2, never()).checkCanRetry(any(), any());
     verify(attempt2, times(1)).completeSuccess();
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java
new file mode 100644
index 0000000..b45b99a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Value;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.BatchGetDocumentsFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class FirestoreV1FnBatchGetDocumentsTest
+    extends BaseFirestoreV1ReadFnTest<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+  @Mock
+  private ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse> callable;
+
+  @Mock private ServerStream<BatchGetDocumentsResponse> responseStream1;
+  @Mock private ServerStream<BatchGetDocumentsResponse> responseStream2;
+  @Mock private ServerStream<BatchGetDocumentsResponse> responseStream3;
+
+  @Test
+  public void endToEnd() throws Exception {
+    final BatchGetDocumentsRequest request =
+        BatchGetDocumentsRequest.newBuilder()
+            .setDatabase(String.format("projects/%s/databases/(default)", projectId))
+            .addDocuments("doc_1-1")
+            .addDocuments("doc_1-2")
+            .addDocuments("doc_1-3")
+            .build();
+
+    final BatchGetDocumentsResponse response1 = newFound(1);
+    final BatchGetDocumentsResponse response2 = newFound(2);
+    final BatchGetDocumentsResponse response3 = newFound(3);
+
+    List<BatchGetDocumentsResponse> responses = ImmutableList.of(response1, response2, response3);
+    when(responseStream1.iterator()).thenReturn(responses.iterator());
+
+    when(callable.call(request)).thenReturn(responseStream1);
+
+    when(stub.batchGetDocumentsCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(rpcQosOptions));
+
+    ArgumentCaptor<BatchGetDocumentsResponse> responsesCaptor =
+        ArgumentCaptor.forClass(BatchGetDocumentsResponse.class);
+    doNothing().when(processContext).output(responsesCaptor.capture());
+
+    when(processContext.element()).thenReturn(request);
+
+    runFunction(getFn(clock, ff, rpcQosOptions));
+
+    List<BatchGetDocumentsResponse> allValues = responsesCaptor.getAllValues();
+    assertEquals(responses, allValues);
+  }
+
+  @Override
+  public void resumeFromLastReadValue() throws Exception {
+
+    final BatchGetDocumentsResponse response1 = newMissing(1);
+    final BatchGetDocumentsResponse response2 = newFound(2);
+    final BatchGetDocumentsResponse response3 = newMissing(3);
+    final BatchGetDocumentsResponse response4 = newFound(4);
+
+    final BatchGetDocumentsRequest request1 =
+        BatchGetDocumentsRequest.newBuilder()
+            .setDatabase(String.format("projects/%s/databases/(default)", projectId))
+            .addDocuments(response1.getMissing())
+            .addDocuments(response2.getFound().getName())
+            .addDocuments(response3.getMissing())
+            .addDocuments(response4.getFound().getName())
+            .build();
+
+    BatchGetDocumentsRequest request2 =
+        BatchGetDocumentsRequest.newBuilder()
+            .setDatabase(String.format("projects/%s/databases/(default)", projectId))
+            .addDocuments(response3.getMissing())
+            .addDocuments(response4.getFound().getName())
+            .build();
+    BatchGetDocumentsRequest request3 =
+        BatchGetDocumentsRequest.newBuilder()
+            .setDatabase(String.format("projects/%s/databases/(default)", projectId))
+            .addDocuments(response4.getFound().getName())
+            .build();
+
+    when(responseStream1.iterator())
+        .thenReturn(
+            new AbstractIterator<BatchGetDocumentsResponse>() {
+              private int counter = 10;
+
+              @Override
+              protected BatchGetDocumentsResponse computeNext() {
+                int count = counter++;
+                if (count == 10) {
+                  return response1;
+                } else if (count == 11) {
+                  return response2;
+                } else {
+                  throw RETRYABLE_ERROR;
+                }
+              }
+            });
+    when(responseStream2.iterator())
+        .thenReturn(
+            new AbstractIterator<BatchGetDocumentsResponse>() {
+              private int counter = 20;
+
+              @Override
+              protected BatchGetDocumentsResponse computeNext() {
+                int count = counter++;
+                if (count == 20) {
+                  return response3;
+                } else {
+                  throw RETRYABLE_ERROR;
+                }
+              }
+            });
+    when(responseStream3.iterator()).thenReturn(ImmutableList.of(response4).iterator());
+
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(callable.call(request1)).thenReturn(responseStream1);
+    when(callable.call(request2)).thenReturn(responseStream2);
+    when(callable.call(request3)).thenReturn(responseStream3);
+
+    when(stub.batchGetDocumentsCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    ArgumentCaptor<BatchGetDocumentsResponse> responsesCaptor =
+        ArgumentCaptor.forClass(BatchGetDocumentsResponse.class);
+
+    doNothing().when(processContext).output(responsesCaptor.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    BatchGetDocumentsFn fn = new BatchGetDocumentsFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<BatchGetDocumentsResponse> expectedResponses =
+        ImmutableList.of(response1, response2, response3, response4);
+    List<BatchGetDocumentsResponse> actualResponses = responsesCaptor.getAllValues();
+    assertEquals(expectedResponses, actualResponses);
+
+    verify(callable, times(1)).call(request1);
+    verify(callable, times(1)).call(request2);
+    verify(attempt, times(4)).recordStreamValue(any());
+  }
+
+  @Override
+  protected V1RpcFnTestCtx newCtx() {
+    return new V1RpcFnTestCtx() {
+      @Override
+      public BatchGetDocumentsRequest getRequest() {
+        return BatchGetDocumentsRequest.newBuilder()
+            .setDatabase(String.format("projects/%s/databases/(default)", projectId))
+            .addDocuments("doc_1-1")
+            .build();
+      }
+
+      @Override
+      public void mockRpcToCallable(FirestoreStub stub) {
+        when(stub.batchGetDocumentsCallable()).thenReturn(callable);
+      }
+
+      @Override
+      public void whenCallableCall(BatchGetDocumentsRequest in, Throwable... throwables) {
+        when(callable.call(in)).thenThrow(throwables);
+      }
+
+      @Override
+      public void verifyNoInteractionsWithCallable() {
+        verifyNoMoreInteractions(callable);
+      }
+    };
+  }
+
+  @Override
+  protected BatchGetDocumentsFn getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions) {
+    return new BatchGetDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+  }
+
+  private static BatchGetDocumentsResponse newFound(int docNumber) {
+    String docName = docName(docNumber);
+    return BatchGetDocumentsResponse.newBuilder()
+        .setFound(
+            Document.newBuilder()
+                .setName(docName)
+                .putAllFields(
+                    ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                .build())
+        .build();
+  }
+
+  private static BatchGetDocumentsResponse newMissing(int docNumber) {
+    String docName = docName(docNumber);
+    return BatchGetDocumentsResponse.newBuilder().setMissing(docName).build();
+  }
+
+  private static String docName(int docNumber) {
+    return String.format("doc-%d", docNumber);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java
new file mode 100644
index 0000000..8a02757
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.ListCollectionIdsFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class FirestoreV1FnListCollectionIdsTest
+    extends BaseFirestoreV1ReadFnTest<ListCollectionIdsRequest, ListCollectionIdsResponse> {
+
+  @Mock private UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> callable;
+  @Mock private ListCollectionIdsPagedResponse pagedResponse1;
+  @Mock private ListCollectionIdsPage page1;
+  @Mock private ListCollectionIdsPagedResponse pagedResponse2;
+  @Mock private ListCollectionIdsPage page2;
+
+  @Test
+  public void endToEnd() throws Exception {
+    // First page of the response
+    ListCollectionIdsRequest request1 =
+        ListCollectionIdsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    ListCollectionIdsResponse response1 =
+        ListCollectionIdsResponse.newBuilder()
+            .addCollectionIds("col_1-1")
+            .addCollectionIds("col_1-2")
+            .addCollectionIds("col_1-3")
+            .setNextPageToken("page2")
+            .build();
+    when(page1.getNextPageToken()).thenReturn(response1.getNextPageToken());
+    when(page1.getResponse()).thenReturn(response1);
+    when(page1.hasNextPage()).thenReturn(true);
+
+    // Second page of the response
+    ListCollectionIdsResponse response2 =
+        ListCollectionIdsResponse.newBuilder().addCollectionIds("col_2-1").build();
+    when(page2.getResponse()).thenReturn(response2);
+    when(page2.hasNextPage()).thenReturn(false);
+    when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1, page2));
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+
+    when(stub.listCollectionIdsPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    RpcQosOptions options = RpcQosOptions.defaultOptions();
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+    ArgumentCaptor<ListCollectionIdsResponse> responses =
+        ArgumentCaptor.forClass(ListCollectionIdsResponse.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, options);
+
+    runFunction(fn);
+
+    List<ListCollectionIdsResponse> expected = newArrayList(response1, response2);
+    List<ListCollectionIdsResponse> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  public void resumeFromLastReadValue() throws Exception {
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    // First page of the response
+    ListCollectionIdsRequest request1 =
+        ListCollectionIdsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    ListCollectionIdsResponse response1 =
+        ListCollectionIdsResponse.newBuilder()
+            .addCollectionIds("col_1-1")
+            .addCollectionIds("col_1-2")
+            .addCollectionIds("col_1-3")
+            .setNextPageToken("page2")
+            .build();
+    when(page1.getNextPageToken()).thenReturn(response1.getNextPageToken());
+    when(page1.getResponse()).thenReturn(response1);
+    when(page1.hasNextPage()).thenReturn(true);
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(pagedResponse1.iteratePages())
+        .thenAnswer(
+            invocation ->
+                new Iterable<ListCollectionIdsPage>() {
+                  @Override
+                  public Iterator<ListCollectionIdsPage> iterator() {
+                    return new AbstractIterator<ListCollectionIdsPage>() {
+                      private boolean first = true;
+
+                      @Override
+                      protected ListCollectionIdsPage computeNext() {
+                        if (first) {
+                          first = false;
+                          return page1;
+                        } else {
+                          throw RETRYABLE_ERROR;
+                        }
+                      }
+                    };
+                  }
+                });
+
+    // Second page of the response
+    ListCollectionIdsRequest request2 =
+        ListCollectionIdsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .setPageToken("page2")
+            .build();
+    ListCollectionIdsResponse response2 =
+        ListCollectionIdsResponse.newBuilder().addCollectionIds("col_2-1").build();
+    when(page2.getResponse()).thenReturn(response2);
+    when(page2.hasNextPage()).thenReturn(false);
+    when(callable.call(request2)).thenReturn(pagedResponse2);
+    when(pagedResponse2.iteratePages()).thenReturn(ImmutableList.of(page2));
+
+    when(stub.listCollectionIdsPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+
+    ArgumentCaptor<ListCollectionIdsResponse> responses =
+        ArgumentCaptor.forClass(ListCollectionIdsResponse.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<ListCollectionIdsResponse> expected = newArrayList(response1, response2);
+    List<ListCollectionIdsResponse> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  protected V1RpcFnTestCtx newCtx() {
+    return new V1RpcFnTestCtx() {
+      @Override
+      public ListCollectionIdsRequest getRequest() {
+        return ListCollectionIdsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+      }
+
+      @Override
+      public void mockRpcToCallable(FirestoreStub stub) {
+        when(stub.listCollectionIdsPagedCallable()).thenReturn(callable);
+      }
+
+      @Override
+      public void whenCallableCall(ListCollectionIdsRequest in, Throwable... throwables) {
+        when(callable.call(in)).thenThrow(throwables);
+      }
+
+      @Override
+      public void verifyNoInteractionsWithCallable() {
+        verifyNoMoreInteractions(callable);
+      }
+    };
+  }
+
+  @Override
+  protected ListCollectionIdsFn getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions) {
+    return new ListCollectionIdsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java
new file mode 100644
index 0000000..c81aae7
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.Value;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.ListDocumentsFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class FirestoreV1FnListDocumentsTest
+    extends BaseFirestoreV1ReadFnTest<ListDocumentsRequest, ListDocumentsResponse> {
+
+  @Mock private UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> callable;
+  @Mock private ListDocumentsPagedResponse pagedResponse1;
+  @Mock private ListDocumentsPage page1;
+  @Mock private ListDocumentsPagedResponse pagedResponse2;
+  @Mock private ListDocumentsPage page2;
+
+  @Test
+  public void endToEnd() throws Exception {
+    // First page of the response
+    ListDocumentsRequest request1 =
+        ListDocumentsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    ListDocumentsResponse response1 =
+        ListDocumentsResponse.newBuilder()
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-1")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-2")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-3")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .setNextPageToken("page2")
+            .build();
+    when(page1.getNextPageToken()).thenReturn(response1.getNextPageToken());
+    when(page1.getResponse()).thenReturn(response1);
+    when(page1.hasNextPage()).thenReturn(true);
+
+    // Second page of the response
+    ListDocumentsResponse response2 =
+        ListDocumentsResponse.newBuilder()
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_2-1")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .build();
+    when(page2.getResponse()).thenReturn(response2);
+    when(page2.hasNextPage()).thenReturn(false);
+    when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1, page2));
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+
+    when(stub.listDocumentsPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    RpcQosOptions options = RpcQosOptions.defaultOptions();
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+    ArgumentCaptor<ListDocumentsResponse> responses =
+        ArgumentCaptor.forClass(ListDocumentsResponse.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    ListDocumentsFn fn = new ListDocumentsFn(clock, ff, options);
+
+    runFunction(fn);
+
+    List<ListDocumentsResponse> expected = newArrayList(response1, response2);
+    List<ListDocumentsResponse> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  public void resumeFromLastReadValue() throws Exception {
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    // First page of the response
+    ListDocumentsRequest request1 =
+        ListDocumentsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    ListDocumentsResponse response1 =
+        ListDocumentsResponse.newBuilder()
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-1")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-2")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_1-3")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .setNextPageToken("page2")
+            .build();
+    when(page1.getNextPageToken()).thenReturn(response1.getNextPageToken());
+    when(page1.getResponse()).thenReturn(response1);
+    when(page1.hasNextPage()).thenReturn(true);
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(pagedResponse1.iteratePages())
+        .thenAnswer(
+            invocation ->
+                new Iterable<ListDocumentsPage>() {
+                  @Override
+                  public Iterator<ListDocumentsPage> iterator() {
+                    return new AbstractIterator<ListDocumentsPage>() {
+                      private boolean first = true;
+
+                      @Override
+                      protected ListDocumentsPage computeNext() {
+                        if (first) {
+                          first = false;
+                          return page1;
+                        } else {
+                          throw RETRYABLE_ERROR;
+                        }
+                      }
+                    };
+                  }
+                });
+
+    // Second page of the response
+    ListDocumentsRequest request2 =
+        ListDocumentsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .setPageToken("page2")
+            .build();
+    ListDocumentsResponse response2 =
+        ListDocumentsResponse.newBuilder()
+            .addDocuments(
+                Document.newBuilder()
+                    .setName("doc_2-1")
+                    .putAllFields(
+                        ImmutableMap.of("foo", Value.newBuilder().setStringValue("bar").build()))
+                    .build())
+            .build();
+    when(page2.getResponse()).thenReturn(response2);
+    when(page2.hasNextPage()).thenReturn(false);
+    when(callable.call(request2)).thenReturn(pagedResponse2);
+    when(pagedResponse2.iteratePages()).thenReturn(ImmutableList.of(page2));
+
+    when(stub.listDocumentsPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+
+    ArgumentCaptor<ListDocumentsResponse> responses =
+        ArgumentCaptor.forClass(ListDocumentsResponse.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    ListDocumentsFn fn = new ListDocumentsFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<ListDocumentsResponse> expected = newArrayList(response1, response2);
+    List<ListDocumentsResponse> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  protected V1RpcFnTestCtx newCtx() {
+    return new V1RpcFnTestCtx() {
+      @Override
+      public ListDocumentsRequest getRequest() {
+        return ListDocumentsRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+      }
+
+      @Override
+      public void mockRpcToCallable(FirestoreStub stub) {
+        when(stub.listDocumentsPagedCallable()).thenReturn(callable);
+      }
+
+      @Override
+      public void whenCallableCall(ListDocumentsRequest in, Throwable... throwables) {
+        when(callable.call(in)).thenThrow(throwables);
+      }
+
+      @Override
+      public void verifyNoInteractionsWithCallable() {
+        verifyNoMoreInteractions(callable);
+      }
+    };
+  }
+
+  @Override
+  protected ListDocumentsFn getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions) {
+    return new ListDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
new file mode 100644
index 0000000..0c9bbf1
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.Value;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryPair;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class FirestoreV1FnPartitionQueryTest
+    extends BaseFirestoreV1ReadFnTest<PartitionQueryRequest, PartitionQueryPair> {
+
+  @Mock private UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse> callable;
+  @Mock private PartitionQueryPagedResponse pagedResponse1;
+  @Mock private PartitionQueryPage page1;
+  @Mock private PartitionQueryPagedResponse pagedResponse2;
+  @Mock private PartitionQueryPage page2;
+
+  @Test
+  public void endToEnd() throws Exception {
+    // First page of the response
+    PartitionQueryRequest request1 =
+        PartitionQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    PartitionQueryResponse response1 =
+        PartitionQueryResponse.newBuilder()
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-100")))
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-200")))
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-300")))
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-400")))
+            .build();
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+    when(page1.getResponse()).thenReturn(response1);
+    when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1));
+
+    when(stub.partitionQueryPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    RpcQosOptions options = RpcQosOptions.defaultOptions();
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+    ArgumentCaptor<PartitionQueryPair> responses =
+        ArgumentCaptor.forClass(PartitionQueryPair.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options);
+
+    runFunction(fn);
+
+    List<PartitionQueryPair> expected = newArrayList(new PartitionQueryPair(request1, response1));
+    List<PartitionQueryPair> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  public void resumeFromLastReadValue() throws Exception {
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    // First page of the response
+    PartitionQueryRequest request1 =
+        PartitionQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+    PartitionQueryResponse response1 =
+        PartitionQueryResponse.newBuilder()
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-100")))
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-200")))
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-300")))
+            .setNextPageToken("page2")
+            .build();
+    when(page1.getResponse()).thenReturn(response1);
+    when(callable.call(request1)).thenReturn(pagedResponse1);
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(pagedResponse1.iteratePages())
+        .thenAnswer(
+            invocation ->
+                new Iterable<PartitionQueryPage>() {
+                  @Override
+                  public Iterator<PartitionQueryPage> iterator() {
+                    return new AbstractIterator<PartitionQueryPage>() {
+                      private boolean first = true;
+
+                      @Override
+                      protected PartitionQueryPage computeNext() {
+                        if (first) {
+                          first = false;
+                          return page1;
+                        } else {
+                          throw RETRYABLE_ERROR;
+                        }
+                      }
+                    };
+                  }
+                });
+
+    // Second page of the response
+    PartitionQueryRequest request2 =
+        PartitionQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .setPageToken("page2")
+            .build();
+    PartitionQueryResponse response2 =
+        PartitionQueryResponse.newBuilder()
+            .addPartitions(
+                Cursor.newBuilder().addValues(Value.newBuilder().setReferenceValue("doc-400")))
+            .build();
+
+    PartitionQueryResponse expectedResponse =
+        response1
+            .toBuilder()
+            .clearNextPageToken()
+            .addAllPartitions(response2.getPartitionsList())
+            .build();
+
+    when(page2.getResponse()).thenReturn(response2);
+    when(page2.hasNextPage()).thenReturn(false);
+    when(callable.call(request2)).thenReturn(pagedResponse2);
+    when(pagedResponse2.iteratePages()).thenReturn(ImmutableList.of(page2));
+
+    when(stub.partitionQueryPagedCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+
+    ArgumentCaptor<PartitionQueryPair> responses =
+        ArgumentCaptor.forClass(PartitionQueryPair.class);
+
+    doNothing().when(processContext).output(responses.capture());
+
+    when(processContext.element()).thenReturn(request1);
+
+    PartitionQueryFn fn = new PartitionQueryFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<PartitionQueryPair> expected =
+        newArrayList(new PartitionQueryPair(request1, expectedResponse));
+    List<PartitionQueryPair> allValues = responses.getAllValues();
+    assertEquals(expected, allValues);
+  }
+
+  @Override
+  protected V1RpcFnTestCtx newCtx() {
+    return new V1RpcFnTestCtx() {
+      @Override
+      public PartitionQueryRequest getRequest() {
+        return PartitionQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+      }
+
+      @Override
+      public void mockRpcToCallable(FirestoreStub stub) {
+        when(stub.partitionQueryPagedCallable()).thenReturn(callable);
+      }
+
+      @Override
+      public void whenCallableCall(PartitionQueryRequest in, Throwable... throwables) {
+        when(callable.call(in)).thenThrow(throwables);
+      }
+
+      @Override
+      public void verifyNoInteractionsWithCallable() {
+        verifyNoMoreInteractions(callable);
+      }
+    };
+  }
+
+  @Override
+  protected PartitionQueryFn getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions) {
+    return new PartitionQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java
new file mode 100644
index 0000000..23b18d0
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnRunQueryTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.CollectionSelector;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.RunQueryFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class FirestoreV1FnRunQueryTest
+    extends BaseFirestoreV1ReadFnTest<RunQueryRequest, RunQueryResponse> {
+
+  @Mock private ServerStreamingCallable<RunQueryRequest, RunQueryResponse> callable;
+  @Mock private ServerStream<RunQueryResponse> responseStream1;
+  @Mock private ServerStream<RunQueryResponse> responseStream2;
+
+  @Test
+  public void endToEnd() throws Exception {
+    TestData testData = TestData.fieldEqualsBar().setProjectId(projectId).build();
+
+    List<RunQueryResponse> responses =
+        ImmutableList.of(testData.response1, testData.response2, testData.response3);
+    when(responseStream1.iterator()).thenReturn(responses.iterator());
+
+    when(callable.call(testData.request)).thenReturn(responseStream1);
+
+    when(stub.runQueryCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    RpcQosOptions options = RpcQosOptions.defaultOptions();
+    when(ff.getRpcQos(any()))
+        .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
+
+    ArgumentCaptor<RunQueryResponse> responsesCaptor =
+        ArgumentCaptor.forClass(RunQueryResponse.class);
+
+    doNothing().when(processContext).output(responsesCaptor.capture());
+
+    when(processContext.element()).thenReturn(testData.request);
+
+    RunQueryFn fn = new RunQueryFn(clock, ff, options);
+
+    runFunction(fn);
+
+    List<RunQueryResponse> allValues = responsesCaptor.getAllValues();
+    assertEquals(responses, allValues);
+  }
+
+  @Override
+  public void resumeFromLastReadValue() throws Exception {
+    TestData testData =
+        TestData.fieldEqualsBar()
+            .setProjectId(projectId)
+            .setOrderFunction(
+                f ->
+                    Collections.singletonList(
+                        Order.newBuilder().setDirection(Direction.ASCENDING).setField(f).build()))
+            .build();
+
+    RunQueryRequest request2 =
+        RunQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .setStructuredQuery(
+                testData
+                    .request
+                    .getStructuredQuery()
+                    .toBuilder()
+                    .setStartAt(
+                        Cursor.newBuilder()
+                            .setBefore(false)
+                            .addValues(Value.newBuilder().setStringValue("bar"))))
+            .build();
+
+    List<RunQueryResponse> responses =
+        ImmutableList.of(testData.response1, testData.response2, testData.response3);
+    when(responseStream1.iterator())
+        .thenReturn(
+            new AbstractIterator<RunQueryResponse>() {
+              private int invocationCount = 1;
+
+              @Override
+              protected RunQueryResponse computeNext() {
+                int count = invocationCount++;
+                if (count == 1) {
+                  return responses.get(0);
+                } else if (count == 2) {
+                  return responses.get(1);
+                } else {
+                  throw RETRYABLE_ERROR;
+                }
+              }
+            });
+
+    when(callable.call(testData.request)).thenReturn(responseStream1);
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(responseStream2.iterator()).thenReturn(ImmutableList.of(responses.get(2)).iterator());
+    when(callable.call(request2)).thenReturn(responseStream2);
+
+    when(stub.runQueryCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    ArgumentCaptor<RunQueryResponse> responsesCaptor =
+        ArgumentCaptor.forClass(RunQueryResponse.class);
+
+    doNothing().when(processContext).output(responsesCaptor.capture());
+
+    when(processContext.element()).thenReturn(testData.request);
+
+    RunQueryFn fn = new RunQueryFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<RunQueryResponse> allValues = responsesCaptor.getAllValues();
+    assertEquals(responses, allValues);
+
+    verify(callable, times(1)).call(testData.request);
+    verify(callable, times(1)).call(request2);
+    verify(attempt, times(3)).recordStreamValue(any());
+  }
+
+  @Test
+  public void resumeFromLastReadValue_withNoOrderBy() throws Exception {
+    TestData testData = TestData.fieldEqualsBar().setProjectId(projectId).build();
+
+    RunQueryRequest request2 =
+        RunQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .setStructuredQuery(
+                testData
+                    .request
+                    .getStructuredQuery()
+                    .toBuilder()
+                    .setStartAt(
+                        Cursor.newBuilder()
+                            .setBefore(false)
+                            .addValues(
+                                Value.newBuilder()
+                                    .setReferenceValue(testData.response2.getDocument().getName())))
+                    .addOrderBy(
+                        Order.newBuilder()
+                            .setField(FieldReference.newBuilder().setFieldPath("__name__"))
+                            .setDirection(Direction.ASCENDING)))
+            .build();
+
+    List<RunQueryResponse> responses =
+        ImmutableList.of(testData.response1, testData.response2, testData.response3);
+    when(responseStream1.iterator())
+        .thenReturn(
+            new AbstractIterator<RunQueryResponse>() {
+              private int invocationCount = 1;
+
+              @Override
+              protected RunQueryResponse computeNext() {
+                int count = invocationCount++;
+                if (count == 1) {
+                  return responses.get(0);
+                } else if (count == 2) {
+                  return responses.get(1);
+                } else {
+                  throw RETRYABLE_ERROR;
+                }
+              }
+            });
+
+    when(callable.call(testData.request)).thenReturn(responseStream1);
+    doNothing().when(attempt).checkCanRetry(any(), eq(RETRYABLE_ERROR));
+    when(responseStream2.iterator()).thenReturn(ImmutableList.of(testData.response3).iterator());
+    when(callable.call(request2)).thenReturn(responseStream2);
+
+    when(stub.runQueryCallable()).thenReturn(callable);
+
+    when(ff.getFirestoreStub(any())).thenReturn(stub);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+
+    ArgumentCaptor<RunQueryResponse> responsesCaptor =
+        ArgumentCaptor.forClass(RunQueryResponse.class);
+
+    doNothing().when(processContext).output(responsesCaptor.capture());
+
+    when(processContext.element()).thenReturn(testData.request);
+
+    RunQueryFn fn = new RunQueryFn(clock, ff, rpcQosOptions);
+
+    runFunction(fn);
+
+    List<RunQueryResponse> allValues = responsesCaptor.getAllValues();
+    assertEquals(responses, allValues);
+
+    verify(callable, times(1)).call(testData.request);
+    verify(callable, times(1)).call(request2);
+    verify(attempt, times(3)).recordStreamValue(any());
+  }
+
+  @Override
+  protected V1RpcFnTestCtx newCtx() {
+    return new V1RpcFnTestCtx() {
+      @Override
+      public RunQueryRequest getRequest() {
+        return RunQueryRequest.newBuilder()
+            .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+            .build();
+      }
+
+      @Override
+      public void mockRpcToCallable(FirestoreStub stub) {
+        when(stub.runQueryCallable()).thenReturn(callable);
+      }
+
+      @Override
+      public void whenCallableCall(RunQueryRequest in, Throwable... throwables) {
+        when(callable.call(in)).thenThrow(throwables);
+      }
+
+      @Override
+      public void verifyNoInteractionsWithCallable() {
+        verifyNoMoreInteractions(callable);
+      }
+    };
+  }
+
+  @Override
+  protected RunQueryFn getFn(
+      JodaClock clock,
+      FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+      RpcQosOptions rpcQosOptions) {
+    return new RunQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+  }
+
+  private static final class TestData {
+
+    private final RunQueryRequest request;
+    private final RunQueryResponse response1;
+    private final RunQueryResponse response2;
+    private final RunQueryResponse response3;
+
+    public TestData(String projectId, Function<FieldReference, List<Order>> orderFunction) {
+      String fieldPath = "foo";
+      FieldReference foo = FieldReference.newBuilder().setFieldPath(fieldPath).build();
+      StructuredQuery.Builder builder =
+          StructuredQuery.newBuilder()
+              .addFrom(
+                  CollectionSelector.newBuilder()
+                      .setAllDescendants(false)
+                      .setCollectionId("collection"))
+              .setWhere(
+                  Filter.newBuilder()
+                      .setFieldFilter(
+                          FieldFilter.newBuilder()
+                              .setField(foo)
+                              .setOp(Operator.EQUAL)
+                              .setValue(Value.newBuilder().setStringValue("bar"))
+                              .build()));
+
+      orderFunction.apply(foo).forEach(builder::addOrderBy);
+      request =
+          RunQueryRequest.newBuilder()
+              .setParent(String.format("projects/%s/databases/(default)/document", projectId))
+              .setStructuredQuery(builder)
+              .build();
+
+      response1 = newResponse(fieldPath, 1);
+      response2 = newResponse(fieldPath, 2);
+      response3 = newResponse(fieldPath, 3);
+    }
+
+    private static RunQueryResponse newResponse(String field, int docNumber) {
+      String docId = String.format("doc-%d", docNumber);
+      return RunQueryResponse.newBuilder()
+          .setDocument(
+              Document.newBuilder()
+                  .setName(docId)
+                  .putAllFields(
+                      ImmutableMap.of(field, Value.newBuilder().setStringValue("bar").build()))
+                  .build())
+          .build();
+    }
+
+    private static Builder fieldEqualsBar() {
+      return new Builder();
+    }
+
+    @SuppressWarnings("initialization.fields.uninitialized") // fields set via builder methods
+    private static final class Builder {
+
+      private String projectId;
+      private Function<FieldReference, List<Order>> orderFunction;
+
+      public Builder() {
+        orderFunction = f -> Collections.emptyList();
+      }
+
+      public Builder setProjectId(String projectId) {
+        this.projectId = projectId;
+        return this;
+      }
+
+      public Builder setOrderFunction(Function<FieldReference, List<Order>> orderFunction) {
+        this.orderFunction = orderFunction;
+        return this;
+      }
+
+      private TestData build() {
+        return new TestData(
+            requireNonNull(projectId, "projectId must be non null"),
+            requireNonNull(orderFunction, "orderFunction must be non null"));
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
new file mode 100644
index 0000000..25ed63c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.CollectionSelector;
+import com.google.firestore.v1.Value;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.PartitionQuery.PartitionQueryResponseToRunQueryRequest;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.PartitionQueryPair;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@SuppressWarnings(
+    "initialization.fields.uninitialized") // mockito fields are initialized via the Mockito Runner
+public final class PartitionQueryResponseToRunQueryRequestTest {
+
+  @Mock protected DoFn<PartitionQueryPair, RunQueryRequest>.ProcessContext processContext;
+
+  @Test
+  public void ensureSortingCorrectlyHandlesPathSegments() {
+    List<Cursor> expected =
+        newArrayList(
+            referenceValueCursor("projects/p1/databases/d1/documents/c1/doc1"),
+            referenceValueCursor("projects/p1/databases/d1/documents/c1/doc2"),
+            referenceValueCursor("projects/p1/databases/d1/documents/c1/doc2/c2/doc1"),
+            referenceValueCursor("projects/p1/databases/d1/documents/c1/doc2/c2/doc2"),
+            referenceValueCursor("projects/p1/databases/d1/documents/c10/doc1"),
+            referenceValueCursor("projects/p1/databases/d1/documents/c2/doc1"),
+            referenceValueCursor("projects/p2/databases/d2/documents/c1/doc1"),
+            referenceValueCursor("projects/p2/databases/d2/documents/c1-/doc1"),
+            referenceValueCursor("projects/p2/databases/d3/documents/c1-/doc1"),
+            referenceValueCursor("projects/p2/databases/d3/documents/c1-/doc1"),
+            Cursor.newBuilder().build());
+
+    for (int i = 0; i < 1000; i++) {
+      List<Cursor> list = new ArrayList<>(expected);
+      Collections.shuffle(list);
+
+      list.sort(PartitionQueryResponseToRunQueryRequest.CURSOR_REFERENCE_VALUE_COMPARATOR);
+
+      assertEquals(expected, list);
+    }
+  }
+
+  @Test
+  public void ensureCursorPairingWorks() {
+    StructuredQuery query =
+        StructuredQuery.newBuilder()
+            .addFrom(
+                CollectionSelector.newBuilder()
+                    .setAllDescendants(true)
+                    .setCollectionId("c1")
+                    .build())
+            .build();
+
+    Cursor cursor1 = referenceValueCursor("projects/p1/databases/d1/documents/c1/doc1");
+    Cursor cursor2 = referenceValueCursor("projects/p1/databases/d1/documents/c1/doc2");
+    Cursor cursor3 = referenceValueCursor("projects/p1/databases/d1/documents/c1/doc2/c2/doc2");
+
+    List<StructuredQuery> expectedQueries =
+        newArrayList(
+            newQueryWithCursors(query, null, cursor1),
+            newQueryWithCursors(query, cursor1, cursor2),
+            newQueryWithCursors(query, cursor2, cursor3),
+            newQueryWithCursors(query, cursor3, null));
+
+    PartitionQueryPair partitionQueryPair =
+        new PartitionQueryPair(
+            PartitionQueryRequest.newBuilder().setStructuredQuery(query).build(),
+            PartitionQueryResponse.newBuilder()
+                .addPartitions(cursor3)
+                .addPartitions(cursor1)
+                .addPartitions(cursor2)
+                .build());
+
+    ArgumentCaptor<RunQueryRequest> captor = ArgumentCaptor.forClass(RunQueryRequest.class);
+    when(processContext.element()).thenReturn(partitionQueryPair);
+    doNothing().when(processContext).output(captor.capture());
+
+    PartitionQueryResponseToRunQueryRequest fn = new PartitionQueryResponseToRunQueryRequest();
+    fn.processElement(processContext);
+
+    List<StructuredQuery> actualQueries =
+        captor.getAllValues().stream()
+            .map(RunQueryRequest::getStructuredQuery)
+            .collect(Collectors.toList());
+
+    assertEquals(expectedQueries, actualQueries);
+  }
+
+  private static Cursor referenceValueCursor(String referenceValue) {
+    return Cursor.newBuilder()
+        .addValues(Value.newBuilder().setReferenceValue(referenceValue).build())
+        .build();
+  }
+
+  private static StructuredQuery newQueryWithCursors(
+      StructuredQuery query, Cursor startAt, Cursor endAt) {
+    StructuredQuery.Builder builder = query.toBuilder();
+    if (startAt != null) {
+      builder.setStartAt(startAt);
+    }
+    if (endAt != null) {
+      builder.setEndAt(endAt);
+    }
+    return builder.build();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java
index 0ba8d6f..e1e1306 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java
@@ -40,6 +40,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
 import io.grpc.Status;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
@@ -55,11 +56,16 @@ import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.FlushBufferImpl;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.RpcWriteAttemptImpl;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffDuration;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffResult;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.StatusCodeAwareBackoff.BackoffResults;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.WriteRampUp;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -303,23 +309,21 @@ public final class RpcQosTest {
     // try 1
     readAttempt.recordRequestStart(monotonicClock.instant());
     readAttempt.recordRequestFailed(monotonicClock.instant());
-    readAttempt.checkCanRetry(RETRYABLE_ERROR);
+    readAttempt.checkCanRetry(monotonicClock.instant(), RETRYABLE_ERROR);
     // try 2
     readAttempt.recordRequestStart(monotonicClock.instant());
     readAttempt.recordRequestFailed(monotonicClock.instant());
-    readAttempt.checkCanRetry(RETRYABLE_ERROR);
+    readAttempt.checkCanRetry(monotonicClock.instant(), RETRYABLE_ERROR);
     // try 3
     readAttempt.recordRequestStart(monotonicClock.instant());
     readAttempt.recordRequestFailed(monotonicClock.instant());
     try {
-      readAttempt.checkCanRetry(RETRYABLE_ERROR);
+      readAttempt.checkCanRetry(monotonicClock.instant(), RETRYABLE_ERROR);
       fail("expected retry to be exhausted after third attempt");
     } catch (ApiException e) {
       assertSame(e, RETRYABLE_ERROR);
     }
 
-    verify(sleeper, times(2))
-        .sleep(anyLong()); // happens in checkCanRetry when the backoff is checked
     verify(counterThrottlingMs, times(0)).inc(anyLong());
     verify(counterRpcFailures, times(3)).inc();
     verify(counterRpcSuccesses, times(0)).inc();
@@ -338,14 +342,12 @@ public final class RpcQosTest {
     // try 1
     readAttempt.recordRequestFailed(monotonicClock.instant());
     try {
-      readAttempt.checkCanRetry(NON_RETRYABLE_ERROR);
+      readAttempt.checkCanRetry(monotonicClock.instant(), NON_RETRYABLE_ERROR);
       fail("expected non-retryable error to throw error on first occurrence");
     } catch (ApiException e) {
       assertSame(e, NON_RETRYABLE_ERROR);
     }
 
-    verify(sleeper, times(1))
-        .sleep(anyLong()); // happens in checkCanRetry when the backoff is checked
     verify(counterThrottlingMs, times(0)).inc(anyLong());
     verify(counterRpcFailures, times(1)).inc();
     verify(counterRpcSuccesses, times(0)).inc();
@@ -364,14 +366,12 @@ public final class RpcQosTest {
     // try 1
     readAttempt.recordRequestFailed(monotonicClock.instant());
     try {
-      readAttempt.checkCanRetry(RETRYABLE_ERROR_WITH_NON_RETRYABLE_CODE);
+      readAttempt.checkCanRetry(monotonicClock.instant(), RETRYABLE_ERROR_WITH_NON_RETRYABLE_CODE);
       fail("expected non-retryable error to throw error on first occurrence");
     } catch (ApiException e) {
       assertSame(e, RETRYABLE_ERROR_WITH_NON_RETRYABLE_CODE);
     }
 
-    verify(sleeper, times(1))
-        .sleep(anyLong()); // happens in checkCanRetry when the backoff is checked
     verify(counterThrottlingMs, times(0)).inc(anyLong());
     verify(counterRpcFailures, times(1)).inc();
     verify(counterRpcSuccesses, times(0)).inc();
@@ -390,7 +390,7 @@ public final class RpcQosTest {
     readAttempt.recordRequestStart(monotonicClock.instant());
     readAttempt.recordRequestFailed(monotonicClock.instant());
     try {
-      readAttempt.checkCanRetry(RETRYABLE_ERROR);
+      readAttempt.checkCanRetry(monotonicClock.instant(), RETRYABLE_ERROR);
       fail("expected error to be re-thrown due to max attempts exhaustion");
     } catch (ApiException e) {
       // expected
@@ -584,6 +584,53 @@ public final class RpcQosTest {
     doTest_isCodeRetryable(Code.UNKNOWN, true);
   }
 
+  @Test
+  public void statusCodeAwareBackoff_graceCodeBackoffWithin60sec() {
+
+    StatusCodeAwareBackoff backoff =
+        new StatusCodeAwareBackoff(
+            random, 5, Duration.standardSeconds(5), ImmutableSet.of(Code.UNAVAILABLE_VALUE));
+
+    BackoffResult backoffResult1 =
+        backoff.nextBackoff(Instant.ofEpochMilli(1), Code.UNAVAILABLE_VALUE);
+    assertEquals(BackoffResults.NONE, backoffResult1);
+
+    BackoffResult backoffResult2 =
+        backoff.nextBackoff(Instant.ofEpochMilli(2), Code.UNAVAILABLE_VALUE);
+    assertEquals(new BackoffDuration(Duration.millis(6_091)), backoffResult2);
+
+    BackoffResult backoffResult3 =
+        backoff.nextBackoff(Instant.ofEpochMilli(60_100), Code.UNAVAILABLE_VALUE);
+    assertEquals(BackoffResults.NONE, backoffResult3);
+  }
+
+  @Test
+  public void statusCodeAwareBackoff_exhausted_attemptCount() {
+
+    StatusCodeAwareBackoff backoff =
+        new StatusCodeAwareBackoff(random, 1, Duration.standardSeconds(5), Collections.emptySet());
+
+    BackoffResult backoffResult1 =
+        backoff.nextBackoff(Instant.ofEpochMilli(1), Code.UNAVAILABLE_VALUE);
+    assertEquals(BackoffResults.EXHAUSTED, backoffResult1);
+  }
+
+  @Test
+  public void statusCodeAwareBackoff_exhausted_cumulativeBackoff() {
+
+    StatusCodeAwareBackoff backoff =
+        new StatusCodeAwareBackoff(random, 3, Duration.standardSeconds(60), Collections.emptySet());
+
+    BackoffDuration backoff60Sec = new BackoffDuration(Duration.standardMinutes(1));
+    BackoffResult backoffResult1 =
+        backoff.nextBackoff(Instant.ofEpochMilli(1), Code.DEADLINE_EXCEEDED_VALUE);
+    assertEquals(backoff60Sec, backoffResult1);
+
+    BackoffResult backoffResult2 =
+        backoff.nextBackoff(Instant.ofEpochMilli(2), Code.DEADLINE_EXCEEDED_VALUE);
+    assertEquals(BackoffResults.EXHAUSTED, backoffResult2);
+  }
+
   private IntStream from0To90By(int increment) {
     return IntStream.iterate(0, i -> i + increment).limit((90 / increment) + 1);
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
index 37ad107..94e1bfa 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
@@ -18,13 +18,25 @@
 package org.apache.beam.sdk.io.gcp.firestore.it;
 
 import static org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.assumeEnvVarSet;
+import static org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.chunkUpDocIds;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assume.assumeThat;
 
+import com.google.api.core.ApiFutures;
+import com.google.cloud.firestore.WriteBatch;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
 import com.google.firestore.v1.Write;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -33,10 +45,19 @@ import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
 import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
 import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode;
+import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout;
+import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DocumentGenerator;
+import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.TestDataLayoutHint;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 import org.junit.AssumptionViolatedException;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -112,11 +133,152 @@ abstract class BaseFirestoreIT {
   }
 
   @Test
+  @TestDataLayoutHint(DataLayout.Deep)
+  public final void listCollections() throws Exception {
+    // verification and cleanup of nested collections is much slower because each document
+    // requires an rpc to find its collections, instead of using the usual size, use 20
+    // to keep the test quick
+    List<String> collectionIds =
+        IntStream.rangeClosed(1, 20).mapToObj(i -> helper.colId()).collect(Collectors.toList());
+
+    ApiFutures.transform(
+            ApiFutures.allAsList(
+                chunkUpDocIds(collectionIds)
+                    .map(
+                        chunk -> {
+                          WriteBatch batch = helper.getFs().batch();
+                          chunk.stream()
+                              .map(col -> helper.getBaseDocument().collection(col).document())
+                              .forEach(ref -> batch.set(ref, ImmutableMap.of("foo", "bar")));
+                          return batch.commit();
+                        })
+                    .collect(Collectors.toList())),
+            FirestoreTestingHelper.flattenListList(),
+            MoreExecutors.directExecutor())
+        .get(10, TimeUnit.SECONDS);
+
+    PCollection<String> actualCollectionIds =
+        testPipeline
+            .apply(Create.of(""))
+            .apply(getListCollectionIdsPTransform(testName.getMethodName()))
+            .apply(
+                FirestoreIO.v1()
+                    .read()
+                    .listCollectionIds()
+                    .withRpcQosOptions(rpcQosOptions)
+                    .build());
+
+    PAssert.that(actualCollectionIds).containsInAnyOrder(collectionIds);
+    testPipeline.run(options);
+  }
+
+  @Test
+  public final void listDocuments() throws Exception {
+    DocumentGenerator documentGenerator = helper.documentGenerator(NUM_ITEMS_TO_GENERATE, "a");
+    documentGenerator.generateDocuments().get(10, TimeUnit.SECONDS);
+
+    PCollection<String> listDocumentPaths =
+        testPipeline
+            .apply(Create.of("a"))
+            .apply(getListDocumentsPTransform(testName.getMethodName()))
+            .apply(FirestoreIO.v1().read().listDocuments().withRpcQosOptions(rpcQosOptions).build())
+            .apply(ParDo.of(new DocumentToName()));
+
+    PAssert.that(listDocumentPaths).containsInAnyOrder(documentGenerator.expectedDocumentPaths());
+    testPipeline.run(options);
+  }
+
+  @Test
+  public final void runQuery() throws Exception {
+    String collectionId = "a";
+    DocumentGenerator documentGenerator =
+        helper.documentGenerator(NUM_ITEMS_TO_GENERATE, collectionId, /* addBazDoc = */ true);
+    documentGenerator.generateDocuments().get(10, TimeUnit.SECONDS);
+
+    PCollection<String> listDocumentPaths =
+        testPipeline
+            .apply(Create.of(collectionId))
+            .apply(getRunQueryPTransform(testName.getMethodName()))
+            .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
+            .apply(ParDo.of(new RunQueryResponseToDocument()))
+            .apply(ParDo.of(new DocumentToName()));
+
+    PAssert.that(listDocumentPaths).containsInAnyOrder(documentGenerator.expectedDocumentPaths());
+    testPipeline.run(options);
+  }
+
+  @Test
+  public final void partitionQuery() throws Exception {
+    String collectionGroupId = UUID.randomUUID().toString();
+    // currently firestore will only generate a partition every 128 documents, so generate enough
+    // documents to get 2 cursors returned, resulting in 3 partitions
+    int partitionCount = 3;
+    int documentCount = (partitionCount * 128) - 1;
+
+    // create some documents for listing and asserting in the test
+    DocumentGenerator documentGenerator =
+        helper.documentGenerator(documentCount, collectionGroupId);
+    documentGenerator.generateDocuments().get(10, TimeUnit.SECONDS);
+
+    PCollection<String> listDocumentPaths =
+        testPipeline
+            .apply(Create.of(collectionGroupId))
+            .apply(getPartitionQueryPTransform(testName.getMethodName(), partitionCount))
+            .apply(FirestoreIO.v1().read().partitionQuery().withNameOnlyQuery().build())
+            .apply(FirestoreIO.v1().read().runQuery().build())
+            .apply(ParDo.of(new RunQueryResponseToDocument()))
+            .apply(ParDo.of(new DocumentToName()));
+
+    PAssert.that(listDocumentPaths).containsInAnyOrder(documentGenerator.expectedDocumentPaths());
+    testPipeline.run(options);
+  }
+
+  @Test
+  public final void batchGet() throws Exception {
+    String collectionId = "a";
+    DocumentGenerator documentGenerator =
+        helper.documentGenerator(NUM_ITEMS_TO_GENERATE, collectionId);
+    documentGenerator.generateDocuments().get(10, TimeUnit.SECONDS);
+
+    PCollection<String> listDocumentPaths =
+        testPipeline
+            .apply(Create.of(Collections.singletonList(documentGenerator.getDocumentIds())))
+            .apply(getBatchGetDocumentsPTransform(testName.getMethodName(), collectionId))
+            .apply(
+                FirestoreIO.v1()
+                    .read()
+                    .batchGetDocuments()
+                    .withRpcQosOptions(rpcQosOptions)
+                    .build())
+            .apply(Filter.by(BatchGetDocumentsResponse::hasFound))
+            .apply(ParDo.of(new BatchGetDocumentsResponseToDocument()))
+            .apply(ParDo.of(new DocumentToName()));
+
+    PAssert.that(listDocumentPaths).containsInAnyOrder(documentGenerator.expectedDocumentPaths());
+    testPipeline.run(options);
+  }
+
+  @Test
   public final void write() {
     String collectionId = "a";
     runWriteTest(getWritePTransform(testName.getMethodName(), collectionId), collectionId);
   }
 
+  protected abstract PTransform<PCollection<String>, PCollection<ListCollectionIdsRequest>>
+      getListCollectionIdsPTransform(String testMethodName);
+
+  protected abstract PTransform<PCollection<String>, PCollection<ListDocumentsRequest>>
+      getListDocumentsPTransform(String testMethodName);
+
+  protected abstract PTransform<PCollection<List<String>>, PCollection<BatchGetDocumentsRequest>>
+      getBatchGetDocumentsPTransform(String testMethodName, String collectionId);
+
+  protected abstract PTransform<PCollection<String>, PCollection<RunQueryRequest>>
+      getRunQueryPTransform(String testMethodName);
+
+  protected abstract PTransform<PCollection<String>, PCollection<PartitionQueryRequest>>
+      getPartitionQueryPTransform(String testMethodName, int partitionCount);
+
   protected abstract PTransform<PCollection<List<String>>, PCollection<Write>> getWritePTransform(
       String testMethodName, String collectionId);
 
@@ -142,4 +304,26 @@ abstract class BaseFirestoreIT {
 
     assertEquals(documentIds, actualDocumentIds);
   }
+
+  private static final class RunQueryResponseToDocument extends DoFn<RunQueryResponse, Document> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getDocument());
+    }
+  }
+
+  private static final class BatchGetDocumentsResponseToDocument
+      extends DoFn<BatchGetDocumentsResponse, Document> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getFound());
+    }
+  }
+
+  private static final class DocumentToName extends DoFn<Document, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getName());
+    }
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java
index 53b02b7..157624e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java
@@ -26,8 +26,21 @@ import static org.junit.Assert.assertTrue;
 import com.google.api.core.ApiFuture;
 import com.google.cloud.firestore.QueryDocumentSnapshot;
 import com.google.cloud.firestore.QuerySnapshot;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
 import com.google.firestore.v1.Document;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.PartitionQueryRequest;
 import com.google.firestore.v1.Precondition;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.CollectionSelector;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
 import com.google.firestore.v1.Value;
 import com.google.firestore.v1.Write;
 import com.google.protobuf.Timestamp;
@@ -135,11 +148,188 @@ public final class FirestoreV1IT extends BaseFirestoreIT {
   }
 
   @Override
+  protected PTransform<PCollection<String>, PCollection<ListCollectionIdsRequest>>
+      getListCollectionIdsPTransform(String testMethodName) {
+    return new ListCollectionIds(helper.getDatabase(), helper.getBaseDocumentPath());
+  }
+
+  @Override
+  protected PTransform<PCollection<String>, PCollection<ListDocumentsRequest>>
+      getListDocumentsPTransform(String testMethodName) {
+    return new ListDocuments(helper.getDatabase(), helper.getBaseDocumentPath());
+  }
+
+  @Override
+  protected PTransform<PCollection<List<String>>, PCollection<BatchGetDocumentsRequest>>
+      getBatchGetDocumentsPTransform(String testMethodName, String collectionId) {
+    return new BatchGetDocuments(helper.getDatabase(), helper.getBaseDocumentPath(), collectionId);
+  }
+
+  @Override
+  protected PTransform<PCollection<String>, PCollection<RunQueryRequest>> getRunQueryPTransform(
+      String testMethodName) {
+    return new RunQuery(helper.getDatabase(), helper.getBaseDocumentPath());
+  }
+
+  @Override
+  protected PTransform<PCollection<String>, PCollection<PartitionQueryRequest>>
+      getPartitionQueryPTransform(String testMethodName, int partitionCount) {
+    return new PartitionQuery(helper.getDatabase(), helper.getDocumentRoot(), partitionCount);
+  }
+
+  @Override
   protected PTransform<PCollection<List<String>>, PCollection<Write>> getWritePTransform(
       String testMethodName, String collectionId) {
     return new WritePTransform(helper.getDatabase(), helper.getBaseDocumentPath(), collectionId);
   }
 
+  private static final class ListCollectionIds
+      extends BasePTransform<String, ListCollectionIdsRequest> {
+
+    public ListCollectionIds(String database, String baseDocumentPath) {
+      super(database, baseDocumentPath, "");
+    }
+
+    @Override
+    public PCollection<ListCollectionIdsRequest> expand(PCollection<String> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<String, ListCollectionIdsRequest>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  c.output(
+                      ListCollectionIdsRequest.newBuilder().setParent(baseDocumentPath).build());
+                }
+              }));
+    }
+  }
+
+  private static final class ListDocuments extends BasePTransform<String, ListDocumentsRequest> {
+
+    public ListDocuments(String database, String baseDocumentPath) {
+      super(database, baseDocumentPath, "");
+    }
+
+    @Override
+    public PCollection<ListDocumentsRequest> expand(PCollection<String> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<String, ListDocumentsRequest>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  c.output(
+                      ListDocumentsRequest.newBuilder()
+                          .setParent(baseDocumentPath)
+                          .setCollectionId(c.element())
+                          .build());
+                }
+              }));
+    }
+  }
+
+  private static final class BatchGetDocuments
+      extends BasePTransform<List<String>, BatchGetDocumentsRequest> {
+
+    public BatchGetDocuments(String database, String baseDocumentPath, String collectionId) {
+      super(database, baseDocumentPath, collectionId);
+    }
+
+    @Override
+    public PCollection<BatchGetDocumentsRequest> expand(PCollection<List<String>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<List<String>, BatchGetDocumentsRequest>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  BatchGetDocumentsRequest.Builder builder =
+                      BatchGetDocumentsRequest.newBuilder().setDatabase(database);
+                  builder.addDocuments(docPath("404"));
+                  c.element().stream().map(docId -> docPath(docId)).forEach(builder::addDocuments);
+                  c.output(builder.build());
+                }
+              }));
+    }
+  }
+
+  private static final class RunQuery extends BasePTransform<String, RunQueryRequest> {
+
+    public RunQuery(String database, String baseDocumentPath) {
+      super(database, baseDocumentPath, "");
+    }
+
+    @Override
+    public PCollection<RunQueryRequest> expand(PCollection<String> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<String, RunQueryRequest>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  c.output(
+                      RunQueryRequest.newBuilder()
+                          .setParent(baseDocumentPath)
+                          .setStructuredQuery(
+                              StructuredQuery.newBuilder()
+                                  .addFrom(
+                                      CollectionSelector.newBuilder().setCollectionId(c.element()))
+                                  .setWhere(
+                                      Filter.newBuilder()
+                                          .setFieldFilter(
+                                              FieldFilter.newBuilder()
+                                                  .setField(
+                                                      FieldReference.newBuilder()
+                                                          .setFieldPath("foo"))
+                                                  .setOp(Operator.EQUAL)
+                                                  .setValue(
+                                                      Value.newBuilder().setStringValue("bar")))))
+                          .build());
+                }
+              }));
+    }
+  }
+
+  private static final class PartitionQuery extends BasePTransform<String, PartitionQueryRequest> {
+
+    private final int partitionCount;
+
+    public PartitionQuery(String database, String baseDocumentPath, int partitionCount) {
+      super(database, baseDocumentPath, "");
+      this.partitionCount = partitionCount;
+    }
+
+    @Override
+    public PCollection<PartitionQueryRequest> expand(PCollection<String> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<String, PartitionQueryRequest>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  c.output(
+                      PartitionQueryRequest.newBuilder()
+                          .setParent(baseDocumentPath)
+                          // set the page size smaller than the number of partitions we're
+                          // requesting to ensure
+                          // that more than one page is to be fetched so that our multi-page
+                          // handling code is
+                          // tested.
+                          .setPageSize(1)
+                          .setPartitionCount(partitionCount)
+                          .setStructuredQuery(
+                              StructuredQuery.newBuilder()
+                                  .addFrom(
+                                      CollectionSelector.newBuilder()
+                                          .setCollectionId(c.element())
+                                          .setAllDescendants(true))
+                                  .addOrderBy(
+                                      Order.newBuilder()
+                                          .setField(
+                                              FieldReference.newBuilder().setFieldPath("__name__"))
+                                          .setDirection(Direction.ASCENDING)))
+                          .build());
+                }
+              }));
+    }
+  }
+
   private static final class WritePTransform extends BasePTransform<List<String>, Write> {
 
     public WritePTransform(String database, String baseDocumentPath, String collectionId) {

Mime
View raw message