beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2149] Improved kafka table implemention.
Date Mon, 15 May 2017 09:23:08 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 868bcbdad -> 0e08c87d7


[BEAM-2149] Improved kafka table implemention.

1. use robust CSV library to parse & print.
2. support different data types rather than just `String`.
3. a little cleanup for TextTable (to extract common methods).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8044e59e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8044e59e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8044e59e

Branch: refs/heads/DSL_SQL
Commit: 8044e59e1bfa175e9d686a4c332be85be22850f1
Parents: 868bcbd
Author: James Xu <xumingmingv@gmail.com>
Authored: Sat May 13 01:07:01 2017 +0800
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Mon May 15 10:58:04 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/schema/BeamTableUtils.java    | 104 +++++++++++++++++
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |  53 ++++-----
 .../schema/text/BeamTextCSVTableIOReader.java   |  66 +----------
 .../schema/text/BeamTextCSVTableIOWriter.java   |  22 +---
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 113 +++++++++++++++++++
 .../sql/schema/text/BeamTextCSVTableTest.java   |   2 -
 6 files changed, 245 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
new file mode 100644
index 0000000..bc622c2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+  public static BeamSQLRow csvLine2BeamSQLRow(
+      CSVFormat csvFormat,
+      String line,
+      BeamSQLRecordType beamSqlRecordType) {
+    BeamSQLRow row = new BeamSQLRow(beamSqlRecordType);
+    try (StringReader reader = new StringReader(line)) {
+      CSVParser parser = csvFormat.parse(reader);
+      CSVRecord rawRecord = parser.getRecords().get(0);
+
+      if (rawRecord.size() != beamSqlRecordType.size()) {
+        throw new IllegalArgumentException(String.format(
+            "Expect %d fields, but actually %d", line,
+            beamSqlRecordType.size(), rawRecord.size()
+        ));
+      } else {
+        for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+          String raw = rawRecord.get(idx);
+          addFieldWithAutoTypeCasting(row, idx, raw);
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("decodeRecord failed!", e);
+    }
+    return row;
+  }
+
+  public static String beamSQLRow2CsvLine(BeamSQLRow row, CSVFormat csvFormat) {
+    StringWriter writer = new StringWriter();
+    try (CSVPrinter printer = csvFormat.print(writer)) {
+      for (int i = 0; i < row.size(); i++) {
+        printer.print(row.getFieldValue(i).toString());
+      }
+      printer.println();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("encodeRecord failed!", e);
+    }
+    return writer.toString();
+  }
+
+  public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) {
+    SqlTypeName columnType = row.getDataType().getFieldsType().get(idx);
+    switch (columnType) {
+      case TINYINT:
+        row.addField(idx, Byte.valueOf(raw));
+        break;
+      case SMALLINT:
+        row.addField(idx, Short.valueOf(raw));
+        break;
+      case INTEGER:
+        row.addField(idx, Integer.valueOf(raw));
+        break;
+      case BIGINT:
+        row.addField(idx, Long.valueOf(raw));
+        break;
+      case FLOAT:
+        row.addField(idx, Float.valueOf(raw));
+        break;
+      case DOUBLE:
+        row.addField(idx, Double.valueOf(raw));
+        break;
+      case VARCHAR:
+        row.addField(idx, raw);
+        break;
+      default:
+        throw new BeamSqlUnsupportedException(
+            String.format("Column type %s is not supported yet!", columnType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 0f40f33..127870c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.dsls.sql.schema.kafka;
 
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine;
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow;
+
 import java.util.List;
+
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -26,33 +30,35 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.type.RelProtoDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.csv.CSVFormat;
 
 /**
  * A Kafka topic that saves records as CSV format.
  *
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
-
-  public static final String DELIMITER = ",";
-  private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class);
-
+  private CSVFormat csvFormat;
   public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
       List<String> topics) {
+    this(protoRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+  }
+
+  public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
+      List<String> topics, CSVFormat format) {
     super(protoRowType, bootstrapServers, topics);
+    this.csvFormat = format;
   }
 
   @Override
   public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
       getPTransformForInput() {
-    return new CsvRecorderDecoder(beamSqlRecordType);
+    return new CsvRecorderDecoder(beamSqlRecordType, csvFormat);
   }
 
   @Override
   public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput() {
-    return new CsvRecorderEncoder(beamSqlRecordType);
+    return new CsvRecorderEncoder(beamSqlRecordType, csvFormat);
   }
 
   /**
@@ -62,9 +68,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
   public static class CsvRecorderDecoder
       extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
{
     private BeamSQLRecordType recordType;
-
-    public CsvRecorderDecoder(BeamSQLRecordType recordType) {
+    private CSVFormat format;
+    public CsvRecorderDecoder(BeamSQLRecordType recordType, CSVFormat format) {
       this.recordType = recordType;
+      this.format = format;
     }
 
     @Override
@@ -73,16 +80,7 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
         @ProcessElement
         public void processElement(ProcessContext c) {
           String rowInString = new String(c.element().getValue());
-          String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
-          if (parts.length != recordType.size()) {
-            LOG.error(String.format("invalid record: ", rowInString));
-          } else {
-            BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
-            for (int idx = 0; idx < parts.length; ++idx) {
-              sourceRecord.addField(idx, parts[idx]);
-            }
-            c.output(sourceRecord);
-          }
+          c.output(csvLine2BeamSQLRow(format, rowInString, recordType));
         }
       }));
     }
@@ -95,9 +93,10 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
   public static class CsvRecorderEncoder
       extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
{
     private BeamSQLRecordType recordType;
-
-    public CsvRecorderEncoder(BeamSQLRecordType recordType) {
+    private CSVFormat format;
+    public CsvRecorderEncoder(BeamSQLRecordType recordType, CSVFormat format) {
       this.recordType = recordType;
+      this.format = format;
     }
 
     @Override
@@ -106,17 +105,9 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
         @ProcessElement
         public void processElement(ProcessContext c) {
           BeamSQLRow in = c.element();
-          StringBuffer sb = new StringBuffer();
-          for (int idx = 0; idx < in.size(); ++idx) {
-            sb.append(DELIMITER);
-            sb.append(in.getFieldValue(idx).toString());
-          }
-          c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
+          c.output(KV.of(new byte[] {}, beamSQLRow2CsvLine(in, format).getBytes()));
         }
       }));
-
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index cf7c095..3c031ce 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -18,11 +18,10 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import java.io.IOException;
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow;
+
 import java.io.Serializable;
-import java.io.StringReader;
 
-import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.io.TextIO;
@@ -31,12 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * IOReader for {@code BeamTextCSVTable}.
@@ -44,7 +38,6 @@ import org.slf4j.LoggerFactory;
 public class BeamTextCSVTableIOReader
     extends PTransform<PBegin, PCollection<BeamSQLRow>>
     implements Serializable {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOReader.class);
   private String filePattern;
   protected BeamSQLRecordType beamSqlRecordType;
   protected CSVFormat csvFormat;
@@ -63,61 +56,8 @@ public class BeamTextCSVTableIOReader
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();
-
-            try (StringReader reader = new StringReader(str)) {
-              CSVRecord rawRecord = null;
-              try {
-                CSVParser parser = csvFormat.parse(reader);
-                rawRecord = parser.getRecords().get(0);
-              } catch (IOException e) {
-                throw new IllegalArgumentException("Invalid text filePattern: " + filePattern,
e);
-              }
-
-              BeamSQLRow row = new BeamSQLRow(beamSqlRecordType);
-              if (rawRecord.size() != beamSqlRecordType.size()) {
-                throw new IllegalArgumentException(String.format(
-                    "Invalid filePattern: {}, expect %d fields, but actually %d", str,
-                    filePattern, beamSqlRecordType.size(), rawRecord.size()
-                ));
-              } else {
-                for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
-                  String raw = rawRecord.get(idx);
-                  addFieldWithAutoTypeCasting(row, idx, raw);
-                }
-                ctx.output(row);
-              }
-            }
+            ctx.output(csvLine2BeamSQLRow(csvFormat, str, beamSqlRecordType));
           }
         }));
   }
-
-  public void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) {
-    SqlTypeName columnType = row.getDataType().getFieldsType().get(idx);
-    switch (columnType) {
-      case TINYINT:
-        row.addField(idx, Byte.valueOf(raw));
-        break;
-      case SMALLINT:
-        row.addField(idx, Short.valueOf(raw));
-        break;
-      case INTEGER:
-        row.addField(idx, Integer.valueOf(raw));
-        break;
-      case BIGINT:
-        row.addField(idx, Long.valueOf(raw));
-        break;
-      case FLOAT:
-        row.addField(idx, Float.valueOf(raw));
-        break;
-      case DOUBLE:
-        row.addField(idx, Double.valueOf(raw));
-        break;
-      case VARCHAR:
-        row.addField(idx, raw);
-        break;
-      default:
-        throw new BeamSqlUnsupportedException(String.format(
-            "Column type %s is not supported yet!", columnType));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index 6104cd8..eade842 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -18,9 +18,9 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import java.io.IOException;
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine;
+
 import java.io.Serializable;
-import java.io.StringWriter;
 
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -31,17 +31,12 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVPrinter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * IOWriter for {@code BeamTextCSVTable}.
  */
 public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow>,
PDone>
     implements Serializable {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOWriter.class);
-
   private String filePattern;
   protected BeamSQLRecordType beamSqlRecordType;
   protected CSVFormat csvFormat;
@@ -58,18 +53,7 @@ public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSQLRow>
 
       @ProcessElement public void processElement(ProcessContext ctx) {
         BeamSQLRow row = ctx.element();
-        StringWriter writer = new StringWriter();
-
-        try (CSVPrinter printer = csvFormat.print(writer)) {
-          for (int i = 0; i < row.size(); i++) {
-            printer.print(row.getFieldValue(i).toString());
-          }
-          printer.println();
-        } catch (IOException e) {
-          throw new IllegalArgumentException("Invalid filePattern: " + filePattern, e);
-        }
-
-        ctx.output(writer.toString());
+        ctx.output(beamSQLRow2CsvLine(row, csvFormat));
       }
     })).apply(TextIO.Write.to(filePattern));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
new file mode 100644
index 0000000..d20af0c
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.kafka;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for BeamKafkaCSVTable.
+ */
+public class BeamKafkaCSVTableTest {
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+  public static BeamSQLRow row1 = new BeamSQLRow(genRowType());
+  public static BeamSQLRow row2 = new BeamSQLRow(genRowType());
+
+  @BeforeClass
+  public static void setUp() {
+    row1.addField(0, 1L);
+    row1.addField(1, 1);
+    row1.addField(2, 1.0);
+
+    row2.addField(0, 2L);
+    row2.addField(1, 2);
+    row2.addField(2, 2.0);
+  }
+
+  @Test public void testCsvRecorderDecoder() throws Exception {
+    PCollection<BeamSQLRow> result = pipeline
+        .apply(
+            Create.of("1,\"1\",1.0", "2,2,2.0")
+        )
+        .apply(ParDo.of(new String2KvBytes()))
+        .apply(
+            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+        );
+
+    PAssert.that(result).containsInAnyOrder(row1, row2);
+
+    pipeline.run();
+  }
+
+  @Test public void testCsvRecorderEncoder() throws Exception {
+    PCollection<BeamSQLRow> result = pipeline
+        .apply(
+            Create.of(row1, row2)
+        )
+        .apply(
+            new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)
+        ).apply(
+            new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)
+        );
+
+    PAssert.that(result).containsInAnyOrder(row1, row2);
+
+    pipeline.run();
+  }
+
+  private static BeamSQLRecordType genRowType() {
+    return BeamSQLRecordType.from(
+        new RelProtoDataType() {
+          @Override public RelDataType apply(RelDataTypeFactory a0) {
+            return a0.builder()
+                .add("order_id", SqlTypeName.BIGINT)
+                .add("site_id", SqlTypeName.INTEGER)
+                .add("price", SqlTypeName.DOUBLE)
+                .build();
+          }
+        }.apply(BeamQueryPlanner.TYPE_FACTORY));
+  }
+
+  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+      implements Serializable {
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(KV.of(new byte[] {}, ctx.element().getBytes()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8044e59e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index e06f8da..3bc29e4 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -31,7 +31,6 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -75,7 +74,6 @@ public class BeamTextCSVTableTest {
       add(buildRow(data));
     }
   }};
-  private static ConcurrentLinkedQueue<Object[]> actualData = new ConcurrentLinkedQueue<>();
 
   private static Path tempFolder;
   private static File readerSourceFile;


Mime
View raw message