calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] calcite git commit: [CALCITE-30] Implement Statement.cancel
Date Thu, 07 Jul 2016 20:49:50 GMT
[CALCITE-30] Implement Statement.cancel

ResultSet.next() now throws an exception indicating that the statement
has been canceled.

Propagate cancel into streaming CSV table (Zhen Wang).

Use AtomicBoolean for cancel flag; deprecate
RelOptPlanner.setCancelFlag.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/d9eb4383
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/d9eb4383
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/d9eb4383

Branch: refs/heads/master
Commit: d9eb4383290e6ec7a5bca8d23fa3e03167c699fe
Parents: 65c1cec
Author: Julian Hyde <jhyde@apache.org>
Authored: Wed Jun 1 13:07:04 2016 +0800
Committer: Julian Hyde <jhyde@apache.org>
Committed: Wed Jul 6 19:44:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/calcite/DataContext.java    |   9 +-
 .../calcite/jdbc/CalciteConnectionImpl.java     |  30 +++-
 .../apache/calcite/jdbc/CalciteResultSet.java   |  24 +++
 .../calcite/plan/AbstractRelOptPlanner.java     |  15 +-
 .../org/apache/calcite/plan/RelOptPlanner.java  |   9 +-
 .../apache/calcite/runtime/CalciteResource.java |   3 +
 .../org/apache/calcite/util/CancelFlag.java     |  23 ++-
 .../calcite/runtime/CalciteResource.properties  |   1 +
 .../org/apache/calcite/test/StreamTest.java     |  42 +++++
 .../calcite/adapter/csv/CsvEnumerator.java      |  41 ++++-
 .../calcite/adapter/csv/CsvFilterableTable.java |   4 +-
 .../calcite/adapter/csv/CsvScannableTable.java  |   4 +-
 .../calcite/adapter/csv/CsvSchemaFactory.java   |  12 +-
 .../adapter/csv/CsvStreamEnumerator.java        |  96 ----------
 .../calcite/adapter/csv/CsvStreamReader.java    |  76 ++++----
 .../adapter/csv/CsvStreamScannableTable.java    |   8 +-
 .../calcite/adapter/csv/CsvTableScan.java       |   3 +-
 .../adapter/csv/CsvTranslatableTable.java       |   8 +-
 .../java/org/apache/calcite/test/CsvTest.java   | 173 ++++++++++++++++++-
 19 files changed, 398 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/DataContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/DataContext.java b/core/src/main/java/org/apache/calcite/DataContext.java
index 3d5203f..78642aa 100644
--- a/core/src/main/java/org/apache/calcite/DataContext.java
+++ b/core/src/main/java/org/apache/calcite/DataContext.java
@@ -27,6 +27,7 @@ import com.google.common.base.CaseFormat;
 
 import java.lang.reflect.Modifier;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Runtime context allowing access to the tables in a database.
@@ -76,7 +77,13 @@ public interface DataContext {
     /** The Spark engine. Available if Spark is on the class path. */
     SPARK_CONTEXT("sparkContext", Object.class),
 
-    /** Sql advisor that suggests completion hints. */
+    /** A mutable flag that indicates whether user has requested that the
+     * current statement be canceled. Cancellation may not be immediate, but
+     * implementations of relational operators should check the flag fairly
+     * frequently and cease execution (e.g. by returning end of data). */
+    CANCEL_FLAG("cancelFlag", AtomicBoolean.class),
+
+    /** Advisor that suggests completion hints for SQL statements. */
     SQL_ADVISOR("sqlAdvisor", SqlAdvisor.class),
 
     /** Time zone in which the current statement is executing. Required;

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
index 61a8d28..195ced7 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteConnectionImpl.java
@@ -76,6 +76,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Implementation of JDBC connection
@@ -197,7 +198,6 @@ abstract class CalciteConnectionImpl
       final CalcitePreparedStatement calcitePreparedStatement =
           (CalcitePreparedStatement) factory.newPreparedStatement(this, null,
               signature, resultSetType, resultSetConcurrency, resultSetHoldability);
-      server.addStatement(this, calcitePreparedStatement.handle);
       server.getStatement(calcitePreparedStatement.handle).setSignature(signature);
       return calcitePreparedStatement;
     } catch (Exception e) {
@@ -278,10 +278,25 @@ abstract class CalciteConnectionImpl
       map.put("?" + o.i, o.e.toLocal());
     }
     map.putAll(signature.internalParameters);
+    final AtomicBoolean cancelFlag;
+    try {
+      cancelFlag = getCancelFlag(handle);
+    } catch (NoSuchStatementException e) {
+      throw Throwables.propagate(e);
+    }
+    map.put(DataContext.Variable.CANCEL_FLAG.camelName, cancelFlag);
     final DataContext dataContext = createDataContext(map);
     return signature.enumerable(dataContext);
   }
 
+  /** Returns the flag that is used to request or check cancel for a particular
+   * statement. */
+  AtomicBoolean getCancelFlag(Meta.StatementHandle handle)
+      throws NoSuchStatementException {
+    final CalciteServerStatement serverStatement = server.getStatement(handle);
+    return ((CalciteServerStatementImpl) serverStatement).cancelFlag;
+  }
+
   public DataContext createDataContext(Map<String, Object> parameterValues) {
     if (config().spark()) {
       return new SlimDataContext();
@@ -301,7 +316,7 @@ abstract class CalciteConnectionImpl
 
   /** Implementation of Queryable. */
   static class CalciteQueryable<T> extends BaseQueryable<T> {
-    public CalciteQueryable(CalciteConnection connection, Type elementType,
+    CalciteQueryable(CalciteConnection connection, Type elementType,
         Expression expression) {
       super(connection, elementType, expression);
     }
@@ -322,7 +337,11 @@ abstract class CalciteConnectionImpl
     public void addStatement(CalciteConnection connection,
         Meta.StatementHandle h) {
       final CalciteConnectionImpl c = (CalciteConnectionImpl) connection;
-      statementMap.put(h.id, new CalciteServerStatementImpl(c));
+      final CalciteServerStatement previous =
+          statementMap.put(h.id, new CalciteServerStatementImpl(c));
+      if (previous != null) {
+        throw new AssertionError();
+      }
     }
 
     public CalciteServerStatement getStatement(Meta.StatementHandle h)
@@ -432,7 +451,7 @@ abstract class CalciteConnectionImpl
   static class ContextImpl implements CalcitePrepare.Context {
     private final CalciteConnectionImpl connection;
 
-    public ContextImpl(CalciteConnectionImpl connection) {
+    ContextImpl(CalciteConnectionImpl connection) {
       this.connection = Preconditions.checkNotNull(connection);
     }
 
@@ -491,8 +510,9 @@ abstract class CalciteConnectionImpl
     private final CalciteConnectionImpl connection;
     private Iterator<Object> iterator;
     private Meta.Signature signature;
+    private final AtomicBoolean cancelFlag = new AtomicBoolean();
 
-    public CalciteServerStatementImpl(CalciteConnectionImpl connection) {
+    CalciteServerStatementImpl(CalciteConnectionImpl connection) {
       this.connection = Preconditions.checkNotNull(connection);
     }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
index c45b245..e62417a 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
@@ -22,6 +22,7 @@ import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Handler;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.NoSuchStatementException;
 import org.apache.calcite.avatica.util.Cursor;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.Linq4j;
@@ -29,6 +30,7 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.runtime.ArrayEnumeratorCursor;
 import org.apache.calcite.runtime.ObjectEnumeratorCursor;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 import java.sql.ResultSet;
@@ -36,17 +38,27 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.calcite.util.Static.RESOURCE;
 
 /**
  * Implementation of {@link ResultSet}
  * for the Calcite engine.
  */
 public class CalciteResultSet extends AvaticaResultSet {
+  private final AtomicBoolean cancelFlag;
+
   CalciteResultSet(AvaticaStatement statement,
       CalcitePrepare.CalciteSignature calciteSignature,
       ResultSetMetaData resultSetMetaData, TimeZone timeZone,
       Meta.Frame firstFrame) {
     super(statement, null, calciteSignature, resultSetMetaData, timeZone, firstFrame);
+    try {
+      cancelFlag = getCalciteConnection().getCancelFlag(statement.handle);
+    } catch (NoSuchStatementException e) {
+      throw Throwables.propagate(e);
+    }
   }
 
   @Override protected CalciteResultSet execute() throws SQLException {
@@ -66,6 +78,18 @@ public class CalciteResultSet extends AvaticaResultSet {
     return this;
   }
 
+  @Override protected void cancel() {
+    cancelFlag.compareAndSet(false, true);
+  }
+
+  @Override public boolean next() throws SQLException {
+    final boolean next = super.next();
+    if (cancelFlag.get()) {
+      throw new SQLException(RESOURCE.statementCanceled().str());
+    }
+    return next;
+  }
+
   @Override public ResultSet create(ColumnMetaData.AvaticaType elementType,
       Iterable<Object> iterable) {
     final List<ColumnMetaData> columnMetaDataList;

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java b/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
index e8f7578..875ecd7 100644
--- a/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/AbstractRelOptPlanner.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import static org.apache.calcite.util.Static.RESOURCE;
@@ -60,7 +61,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
 
   private Pattern ruleDescExclusionFilter;
 
-  private CancelFlag cancelFlag;
+  private final AtomicBoolean cancelFlag;
 
   private final Set<Class<? extends RelNode>> classes = new HashSet<>();
 
@@ -76,7 +77,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
   /**
    * Creates an AbstractRelOptPlanner.
    */
-  protected AbstractRelOptPlanner(RelOptCostFactory costFactory, //
+  protected AbstractRelOptPlanner(RelOptCostFactory costFactory,
       Context context) {
     assert costFactory != null;
     this.costFactory = costFactory;
@@ -85,9 +86,9 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
     }
     this.context = context;
 
-    // In case no one calls setCancelFlag, set up a
-    // dummy here.
-    cancelFlag = new CancelFlag();
+    final CancelFlag cancelFlag = context.unwrap(CancelFlag.class);
+    this.cancelFlag = cancelFlag != null ? cancelFlag.atomicBoolean
+        : new AtomicBoolean();
 
     // Add abstract RelNode classes. No RelNodes will ever be registered with
     // these types, but some operands may use them.
@@ -108,7 +109,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
   }
 
   public void setCancelFlag(CancelFlag cancelFlag) {
-    this.cancelFlag = cancelFlag;
+    // ignored
   }
 
   /**
@@ -116,7 +117,7 @@ public abstract class AbstractRelOptPlanner implements RelOptPlanner {
    * an exception.
    */
   public void checkCancel() {
-    if (cancelFlag.isCancelRequested()) {
+    if (cancelFlag.get()) {
       throw RESOURCE.preparationAborted().ex();
     }
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java b/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
index 58f4449..f4ba090 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptPlanner.java
@@ -122,12 +122,15 @@ public interface RelOptPlanner {
   void setRuleDescExclusionFilter(Pattern exclusionFilter);
 
   /**
-   * Installs the cancellation-checking flag for this planner. The planner
-   * should periodically check this flag and terminate the planning process if
-   * it sees a cancellation request.
+   * Does nothing.
+   *
+   * @deprecated Previously, this method installed the cancellation-checking
+   * flag for this planner, but is now deprecated. Now, you should add a
+   * {@link CancelFlag} to the {@link Context} passed to the constructor.
    *
    * @param cancelFlag flag which the planner should periodically check
    */
+  @Deprecated // to be removed before 2.0
   void setCancelFlag(CancelFlag cancelFlag);
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 80ce001..b936859 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -615,6 +615,9 @@ public interface CalciteResource {
 
   @BaseMessage("SELECT must have a FROM clause")
   ExInst<SqlValidatorException> selectMissingFrom();
+
+  @BaseMessage("Statement canceled")
+  Inst statementCanceled();
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/java/org/apache/calcite/util/CancelFlag.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/CancelFlag.java b/core/src/main/java/org/apache/calcite/util/CancelFlag.java
index b1f57b7..61e678b 100644
--- a/core/src/main/java/org/apache/calcite/util/CancelFlag.java
+++ b/core/src/main/java/org/apache/calcite/util/CancelFlag.java
@@ -16,13 +16,28 @@
  */
 package org.apache.calcite.util;
 
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptPlanner;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * CancelFlag is used to post and check cancellation requests.
+ *
+ * <p>Pass it to {@link RelOptPlanner} by putting it into a {@link Context}.
  */
 public class CancelFlag {
   //~ Instance fields --------------------------------------------------------
 
-  private boolean cancelRequested;
+  /** The flag that holds the cancel state.
+   * Feel free to use the flag directly. */
+  public final AtomicBoolean atomicBoolean;
+
+  public CancelFlag(AtomicBoolean atomicBoolean) {
+    this.atomicBoolean = Preconditions.checkNotNull(atomicBoolean);
+  }
 
   //~ Methods ----------------------------------------------------------------
 
@@ -30,21 +45,21 @@ public class CancelFlag {
    * @return whether a cancellation has been requested
    */
   public boolean isCancelRequested() {
-    return cancelRequested;
+    return atomicBoolean.get();
   }
 
   /**
    * Requests a cancellation.
    */
   public void requestCancel() {
-    cancelRequested = true;
+    atomicBoolean.compareAndSet(false, true);
   }
 
   /**
    * Clears any pending cancellation request.
    */
   public void clearCancel() {
-    cancelRequested = false;
+    atomicBoolean.compareAndSet(true, false);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 3573f00..bab7a4a 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -201,4 +201,5 @@ TableNotFound=Table ''{0}'' not found
 FilterMustBeBoolean=FILTER expression must be of type BOOLEAN
 CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream
 SelectMissingFrom=SELECT must have a FROM clause
+StatementCanceled=Statement canceled
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 6d79b1d..6b6e225 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -51,7 +51,10 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for streaming queries.
@@ -234,6 +237,45 @@ public class StreamTest {
         .returnsCount(100);
   }
 
+  @Test(timeout = 10000) public void testStreamCancel() {
+    final String explain = "EnumerableInterpreter\n"
+        + "  BindableTableScan(table=[[INFINITE_STREAMS, ORDERS, (STREAM)]])";
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME)
+        .query("select stream * from orders")
+        .explainContains(explain)
+        .returns(
+            new Function<ResultSet, Void>() {
+              public Void apply(final ResultSet resultSet) {
+                int n = 0;
+                try {
+                  while (resultSet.next()) {
+                    if (++n == 5) {
+                      new Thread(
+                          new Runnable() {
+                            @Override public void run() {
+                              try {
+                                Thread.sleep(3);
+                                resultSet.getStatement().cancel();
+                              } catch (InterruptedException | SQLException e) {
+                                // ignore
+                              }
+                            }
+                          }).start();
+                    }
+                  }
+                  fail("expected cancel, got end-of-data");
+                } catch (SQLException e) {
+                  assertThat(e.getMessage(), is("Statement canceled"));
+                }
+                // With a 3 millisecond delay, typically n is between 200 - 400
+                // before cancel takes effect.
+                assertTrue("n is " + n, n > 5);
+                return null;
+              }
+            });
+  }
+
   @Test public void testStreamToRelationJoin() {
     CalciteAssert.model(STREAM_JOINS_MODEL)
         .withDefaultSchema(STREAM_JOINS_SCHEMA_NAME)

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
index 5b942fa..716992c 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
@@ -26,6 +26,8 @@ import org.apache.commons.lang3.time.FastDateFormat;
 
 import au.com.bytecode.opencsv.CSVReader;
 
+import com.google.common.base.Throwables;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
@@ -37,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.GZIPInputStream;
 
 
@@ -47,6 +50,7 @@ import java.util.zip.GZIPInputStream;
 class CsvEnumerator<E> implements Enumerator<E> {
   private final CSVReader reader;
   private final String[] filterValues;
+  private final AtomicBoolean cancelFlag;
   private final RowConverter<E> rowConverter;
   private E current;
 
@@ -62,21 +66,29 @@ class CsvEnumerator<E> implements Enumerator<E> {
         FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
   }
 
-  public CsvEnumerator(File file, List<CsvFieldType> fieldTypes) {
-    this(file, fieldTypes, identityList(fieldTypes.size()));
+  public CsvEnumerator(File file, AtomicBoolean cancelFlag,
+      List<CsvFieldType> fieldTypes) {
+    this(file, cancelFlag, fieldTypes, identityList(fieldTypes.size()));
   }
 
-  public CsvEnumerator(File file, List<CsvFieldType> fieldTypes, int[] fields) {
+  public CsvEnumerator(File file, AtomicBoolean cancelFlag,
+      List<CsvFieldType> fieldTypes, int[] fields) {
     //noinspection unchecked
-    this(file, null, (RowConverter<E>) converter(fieldTypes, fields));
+    this(file, cancelFlag, false, null,
+        (RowConverter<E>) converter(fieldTypes, fields));
   }
 
-  public CsvEnumerator(File file, String[] filterValues,
-      RowConverter<E> rowConverter) {
+  public CsvEnumerator(File file, AtomicBoolean cancelFlag, boolean stream,
+      String[] filterValues, RowConverter<E> rowConverter) {
+    this.cancelFlag = cancelFlag;
     this.rowConverter = rowConverter;
     this.filterValues = filterValues;
     try {
-      this.reader = openCsv(file);
+      if (stream) {
+        this.reader = new CsvStreamReader(file);
+      } else {
+        this.reader = openCsv(file);
+      }
       this.reader.readNext(); // skip header row
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -94,7 +106,7 @@ class CsvEnumerator<E> implements Enumerator<E> {
   }
 
   static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
-                                   List<CsvFieldType> fieldTypes) {
+      List<CsvFieldType> fieldTypes) {
     return deduceRowType(typeFactory, file, fieldTypes, false);
   }
 
@@ -106,7 +118,7 @@ class CsvEnumerator<E> implements Enumerator<E> {
     final List<String> names = new ArrayList<>();
     CSVReader reader = null;
     if (stream) {
-      names.add("ROWTIME");
+      names.add(CsvSchemaFactory.ROWTIME_COLUMN_NAME);
       types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
     }
     try {
@@ -180,8 +192,19 @@ class CsvEnumerator<E> implements Enumerator<E> {
     try {
     outer:
       for (;;) {
+        if (cancelFlag.get()) {
+          return false;
+        }
         final String[] strings = reader.readNext();
         if (strings == null) {
+          if (reader instanceof CsvStreamReader) {
+            try {
+              Thread.sleep(CsvStreamReader.DEFAULT_MONITOR_DELAY);
+            } catch (InterruptedException e) {
+              throw Throwables.propagate(e);
+            }
+            continue;
+          }
           current = null;
           reader.close();
           return false;

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
index 4ca26f7..593de5a 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvFilterableTable.java
@@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlKind;
 import java.io.File;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Table based on a CSV file that can implement simple filtering.
@@ -58,9 +59,10 @@ public class CsvFilterableTable extends CsvTable
       }
     }
     final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+    final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
     return new AbstractEnumerable<Object[]>() {
       public Enumerator<Object[]> enumerator() {
-        return new CsvEnumerator<Object[]>(file, filterValues,
+        return new CsvEnumerator<>(file, cancelFlag, false, filterValues,
             new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
       }
     };

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
index 005d15b..5078afe 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvScannableTable.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.schema.ScannableTable;
 
 import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Table based on a CSV file.
@@ -44,9 +45,10 @@ public class CsvScannableTable extends CsvTable
 
   public Enumerable<Object[]> scan(DataContext root) {
     final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+    final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
     return new AbstractEnumerable<Object[]>() {
       public Enumerator<Object[]> enumerator() {
-        return new CsvEnumerator<Object[]>(file,
+        return new CsvEnumerator<>(file, cancelFlag, false,
             null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
       }
     };

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
index 9f3c2bd..44520c1 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory.java
@@ -28,12 +28,18 @@ import java.util.Map;
  * Factory that creates a {@link CsvSchema}.
  *
  * <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
- * file.</p>
+ * file.
  */
 @SuppressWarnings("UnusedDeclaration")
 public class CsvSchemaFactory implements SchemaFactory {
-  // public constructor, per factory contract
-  public CsvSchemaFactory() {
+  /** Name of the column that is implicitly created in a CSV stream table
+   * to hold the data arrival time. */
+  static final String ROWTIME_COLUMN_NAME = "ROWTIME";
+
+  /** Public singleton, per factory contract. */
+  public static final CsvSchemaFactory INSTANCE = new CsvSchemaFactory();
+
+  private CsvSchemaFactory() {
   }
 
   public Schema create(SchemaPlus parentSchema, String name,

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
deleted file mode 100644
index 4609f11..0000000
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.linq4j.Enumerator;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Csv Streaming enumerator
- * @param <E> Row type
- */
-public class CsvStreamEnumerator<E> implements Enumerator<E> {
-  protected CsvStreamReader streamReader;
-  protected String[] filterValues;
-  protected CsvEnumerator.RowConverter<E> rowConverter;
-  protected E current;
-
-  public CsvStreamEnumerator(File file, String[] filterValues,
-    CsvEnumerator.RowConverter<E> rowConverter) {
-    this.rowConverter = rowConverter;
-    this.filterValues = filterValues;
-    try {
-      this.streamReader = new CsvStreamReader(file);
-      this.streamReader.readNext(); // skip header row
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public boolean moveNext() {
-    return true;
-  }
-
-  public E readNext() {
-    try {
-    outer:
-      for (;;) {
-        final String[] strings = streamReader.readNext();
-        if (strings == null) {
-          current = null;
-          streamReader.close();
-          return current;
-        } else {
-          if (filterValues != null) {
-            for (int i = 0; i < strings.length; i++) {
-              String filterValue = filterValues[i];
-              if (filterValue != null) {
-                if (!filterValue.equals(strings[i])) {
-                  continue outer;
-                }
-              }
-            }
-          }
-          current = rowConverter.convertRow(strings);
-          return current;
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override public E current() {
-    return readNext();
-  }
-
-  @Override public void close() {
-    try {
-      streamReader.close();
-    } catch (IOException e) {
-      throw new RuntimeException("Error closing Csv Stream reader", e);
-    }
-  }
-
-  @Override public void reset() {
-    throw new UnsupportedOperationException();
-  }
-}
-
-// End CsvStreamEnumerator.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
index 5f2cf19..e97278a 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
@@ -21,17 +21,21 @@ import org.apache.commons.io.input.TailerListener;
 import org.apache.commons.io.input.TailerListenerAdapter;
 
 import au.com.bytecode.opencsv.CSVParser;
+import au.com.bytecode.opencsv.CSVReader;
+
+import com.google.common.base.Throwables;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.StringReader;
 import java.util.ArrayDeque;
 import java.util.Queue;
 
 /**
- * CSVSreamReader that can read newly appended file content
+ * Extension to {@link CSVReader} that can read newly appended file content.
  */
-public class CsvStreamReader implements Closeable {
+class CsvStreamReader extends CSVReader implements Closeable {
   protected CSVParser parser;
   protected int skipLines;
   protected Tailer tailer;
@@ -47,48 +51,44 @@ public class CsvStreamReader implements Closeable {
    */
   public static final long DEFAULT_MONITOR_DELAY = 2000;
 
-  public CsvStreamReader(File csvFile) {
-    this(
-      csvFile,
+  CsvStreamReader(File csvFile) {
+    this(csvFile,
       CSVParser.DEFAULT_SEPARATOR,
       CSVParser.DEFAULT_QUOTE_CHARACTER,
       CSVParser.DEFAULT_ESCAPE_CHARACTER,
       DEFAULT_SKIP_LINES,
       CSVParser.DEFAULT_STRICT_QUOTES,
-      CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE
-    );
+      CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE);
   }
 
   /**
-   * Constructs CSVReader with supplied separator and quote char.
+   * Creates a CsvStreamReader with supplied separator and quote char.
    *
-   * @param csvFile the file to an underlying CSV source.
-   * @param separator the delimiter to use for separating entries
-   * @param quotechar the character to use for quoted elements
-   * @param escape the character to use for escaping a separator or quote
-   * @param line the line number to skip for start reading
-   * @param strictQuotes sets if characters outside the quotes are ignored
-   * @param ignoreLeadingWhiteSpace it true, parser should ignore
+   * @param csvFile The file to an underlying CSV source.
+   * @param separator The delimiter to use for separating entries
+   * @param quoteChar The character to use for quoted elements
+   * @param escape The character to use for escaping a separator or quote
+   * @param line The line number to skip for start reading
+   * @param strictQuotes Sets if characters outside the quotes are ignored
+   * @param ignoreLeadingWhiteSpace If true, parser should ignore
    *  white space before a quote in a field
    */
-  public CsvStreamReader(File csvFile, char separator, char quotechar, char escape, int line,
-                         boolean strictQuotes, boolean ignoreLeadingWhiteSpace) {
-    contentQueue = new ArrayDeque<String>();
-    TailerListener listener = new CSVContentListener(contentQueue);
-    tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false, true, 4096);
-    this.parser = new CSVParser(
-      separator,
-      quotechar,
-      escape,
-      strictQuotes,
-      ignoreLeadingWhiteSpace
-    );
+  private CsvStreamReader(File csvFile, char separator, char quoteChar,
+      char escape, int line, boolean strictQuotes,
+      boolean ignoreLeadingWhiteSpace) {
+    super(new StringReader("")); // dummy call to base constructor
+    contentQueue = new ArrayDeque<>();
+    TailerListener listener = new CsvContentListener(contentQueue);
+    tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false,
+        true, 4096);
+    this.parser = new CSVParser(separator, quoteChar, escape, strictQuotes,
+        ignoreLeadingWhiteSpace);
     this.skipLines = line;
     try {
-      //wait for tailer to capture data
+      // wait for tailer to capture data
       Thread.sleep(DEFAULT_MONITOR_DELAY);
     } catch (InterruptedException e) {
-      //ignore the interruption
+      throw Throwables.propagate(e);
     }
   }
 
@@ -100,17 +100,11 @@ public class CsvStreamReader implements Closeable {
    * @throws IOException if bad things happen during the read
    */
   public String[] readNext() throws IOException {
-
     String[] result = null;
     do {
       String nextLine = getNextLine();
-      while (nextLine == null) {
-        try {
-          Thread.sleep(DEFAULT_MONITOR_DELAY);
-          nextLine = getNextLine();
-        } catch (InterruptedException e) {
-          return null; // should throw if still pending?
-        }
+      if (nextLine == null) {
+        return null;
       }
       String[] r = parser.parseLineMulti(nextLine);
       if (r.length > 0) {
@@ -146,11 +140,11 @@ public class CsvStreamReader implements Closeable {
   public void close() throws IOException {
   }
 
-  /** csv file content watcher*/
-  class CSVContentListener extends TailerListenerAdapter {
-    Queue<String> contentQueue;
+  /** Watches for content being appended to a CSV file. */
+  private static class CsvContentListener extends TailerListenerAdapter {
+    final Queue<String> contentQueue;
 
-    CSVContentListener(Queue<String> contentQueue) {
+    CsvContentListener(Queue<String> contentQueue) {
       this.contentQueue = contentQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
index 752f8a7..72af281 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.schema.Table;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Table based on a CSV file.
@@ -49,7 +50,7 @@ public class CsvStreamScannableTable extends CsvScannableTable
       return protoRowType.apply(typeFactory);
     }
     if (fieldTypes == null) {
-      fieldTypes = new ArrayList<CsvFieldType>();
+      fieldTypes = new ArrayList<>();
       return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, fieldTypes, true);
     } else {
       return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, null, true);
@@ -62,10 +63,11 @@ public class CsvStreamScannableTable extends CsvScannableTable
 
   public Enumerable<Object[]> scan(DataContext root) {
     final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+    final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
     return new AbstractEnumerable<Object[]>() {
       public Enumerator<Object[]> enumerator() {
-        return new CsvStreamEnumerator<Object[]>(file,
-            null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true));
+        return new CsvEnumerator<>(file, cancelFlag, true, null,
+            new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true));
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
index 8a53e84..5165aab 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan.java
@@ -97,7 +97,8 @@ public class CsvTableScan extends TableScan implements EnumerableRel {
         physType,
         Blocks.toBlock(
             Expressions.call(table.getExpression(CsvTranslatableTable.class),
-                "project", Expressions.constant(fields))));
+                "project", implementor.getRootExpression(),
+                Expressions.constant(fields))));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
----------------------------------------------------------------------
diff --git a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
index 5ae9880..8970e28 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.csv;
 
+import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
@@ -32,6 +33,7 @@ import org.apache.calcite.schema.TranslatableTable;
 
 import java.io.File;
 import java.lang.reflect.Type;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Table based on a CSV file.
@@ -50,10 +52,12 @@ public class CsvTranslatableTable extends CsvTable
   /** Returns an enumerable over a given projection of the fields.
    *
    * <p>Called from generated code. */
-  public Enumerable<Object> project(final int[] fields) {
+  public Enumerable<Object> project(final DataContext root,
+      final int[] fields) {
+    final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
     return new AbstractEnumerable<Object>() {
       public Enumerator<Object> enumerator() {
-        return new CsvEnumerator<Object>(file, fieldTypes, fields);
+        return new CsvEnumerator<>(file, cancelFlag, fieldTypes, fields);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/d9eb4383/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
----------------------------------------------------------------------
diff --git a/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java b/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
index 173994b..353758a 100644
--- a/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
+++ b/example/csv/src/test/java/org/apache/calcite/test/CsvTest.java
@@ -17,10 +17,12 @@
 package org.apache.calcite.test;
 
 import org.apache.calcite.adapter.csv.CsvSchemaFactory;
+import org.apache.calcite.adapter.csv.CsvStreamTableFactory;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.util.Util;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -29,7 +31,10 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileWriter;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -40,11 +45,18 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Unit test of the Calcite adapter for CSV.
@@ -67,10 +79,6 @@ public class CsvTest {
     }
   }
 
-  public static String toLinux(String s) {
-    return s.replaceAll("\r\n", "\n");
-  }
-
   /**
    * Tests the vanity driver.
    */
@@ -302,7 +310,7 @@ public class CsvTest {
             .append(resultSet.getString(i));
         sep = "; ";
       }
-      result.add(toLinux(buf.toString()));
+      result.add(Util.toLinux(buf.toString()));
     }
   }
 
@@ -488,7 +496,7 @@ public class CsvTest {
           CalciteConnection.class);
 
       final Schema schema =
-          new CsvSchemaFactory()
+          CsvSchemaFactory.INSTANCE
               .create(calciteConnection.getRootSchema(), null,
                   ImmutableMap.<String, Object>of("directory",
                       resourcePath("sales"), "flavor", "scannable"));
@@ -503,6 +511,159 @@ public class CsvTest {
       expect.apply(resultSet1);
     }
   }
+
+  @Test(timeout = 10000) public void testCsvStream() throws Exception {
+    final File file = File.createTempFile("stream", "csv");
+    final String model = "{\n"
+        + "  version: '1.0',\n"
+        + "  defaultSchema: 'STREAM',\n"
+        + "  schemas: [\n"
+        + "    {\n"
+        + "      name: 'SS',\n"
+        + "      tables: [\n"
+        + "        {\n"
+        + "          name: 'DEPTS',\n"
+        + "          type: 'custom',\n"
+        + "          factory: '" + CsvStreamTableFactory.class.getName()
+        + "',\n"
+        + "          stream: {\n"
+        + "            stream: true\n"
+        + "          },\n"
+        + "          operand: {\n"
+        + "            file: '" + file.getAbsolutePath() + "',\n"
+        + "            flavor: \"scannable\"\n"
+        + "          }\n"
+        + "        }\n"
+        + "      ]\n"
+        + "    }\n"
+        + "  ]\n"
+        + "}\n";
+    final String[] strings = {
+      "DEPTNO:int,NAME:string",
+      "10,\"Sales\"",
+      "20,\"Marketing\"",
+      "30,\"Engineering\""
+    };
+
+    try (final Connection connection =
+             DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
+         final PrintWriter pw = new PrintWriter(new FileWriter(file));
+         final Worker<Void> worker = new Worker<>()) {
+      final Thread thread = new Thread(worker);
+      thread.start();
+
+      // Add some rows so that the table can deduce its row type.
+      final Iterator<String> lines = Arrays.asList(strings).iterator();
+      worker.queue.put(writeLine(pw, lines.next())); // header
+      worker.queue.put(writeLine(pw, lines.next())); // first row
+      worker.queue.put(sleep(10));
+      worker.queue.put(writeLine(pw, lines.next())); // second row
+      final CalciteConnection calciteConnection =
+          connection.unwrap(CalciteConnection.class);
+      final String sql = "select stream * from \"SS\".\"DEPTS\"";
+      final PreparedStatement statement =
+          calciteConnection.prepareStatement(sql);
+      final ResultSet resultSet = statement.executeQuery();
+      int count = 0;
+      try {
+        while (resultSet.next()) {
+          ++count;
+          if (lines.hasNext()) {
+            worker.queue.put(sleep(10));
+            worker.queue.put(writeLine(pw, lines.next()));
+          } else {
+            worker.queue.put(cancel(statement));
+          }
+        }
+        fail("expected exception, got end of data");
+      } catch (SQLException e) {
+        assertThat(e.getMessage(), is("Statement canceled"));
+      }
+      assertThat(count, anyOf(is(strings.length - 2), is(strings.length - 1)));
+      assertThat(worker.e, nullValue());
+      assertThat(worker.v, nullValue());
+    } finally {
+      Util.discard(file.delete());
+    }
+  }
+
+  /** Creates a command that appends a line to the CSV file. */
+  private Callable<Void> writeLine(final PrintWriter pw, final String line) {
+    return new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        pw.println(line);
+        pw.flush();
+        return null;
+      }
+    };
+  }
+
+  /** Creates a command that sleeps. */
+  private Callable<Void> sleep(final long millis) {
+    return new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        Thread.sleep(millis);
+        return null;
+      }
+    };
+  }
+
+  /** Creates a command that cancels a statement. */
+  private Callable<Void> cancel(final Statement statement) {
+    return new Callable<Void>() {
+      @Override public Void call() throws Exception {
+        statement.cancel();
+        return null;
+      }
+    };
+  }
+
+  /** Receives commands on a queue and executes them on its own thread.
+   * Call {@link #close} to terminate.
+   *
+   * @param <E> Result value of commands
+   */
+  private static class Worker<E> implements Runnable, AutoCloseable {
+    /** Queue of commands. */
+    final BlockingQueue<Callable<E>> queue =
+        new ArrayBlockingQueue<>(5);
+
+    /** Value returned by the most recent command. */
+    private E v;
+
+    /** Exception thrown by a command or queue wait. */
+    private Exception e;
+
+    /** The poison pill command. */
+    final Callable<E> end =
+        new Callable<E>() {
+          public E call() {
+            return null;
+          }
+        };
+
+    public void run() {
+      try {
+        for (;;) {
+          final Callable<E> c = queue.take();
+          if (c == end) {
+            return;
+          }
+          this.v = c.call();
+        }
+      } catch (Exception e) {
+        this.e = e;
+      }
+    }
+
+    public void close() {
+      try {
+        queue.put(end);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+  }
 }
 
 // End CsvTest.java


Mime
View raw message