beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [3/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.
Date Tue, 15 Aug 2017 18:41:58 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..4bedec1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+  private CSVFormat csvFormat;
+  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+      List<String> topics) {
+    this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+  }
+
+  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+      List<String> topics, CSVFormat format) {
+    super(beamSqlRowType, bootstrapServers, topics);
+    this.csvFormat = format;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
+      getPTransformForInput() {
+    return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
+  }
+
+  @Override
+  public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
+      getPTransformForOutput() {
+    return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
+  }
+
+  /**
+   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
+   *
+   */
+  public static class CsvRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
+    private BeamRecordSqlType rowType;
+    private CSVFormat format;
+    public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
+      this.rowType = rowType;
+      this.format = format;
+    }
+
+    @Override
+    public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          String rowInString = new String(c.element().getValue());
+          c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
+        }
+      }));
+    }
+  }
+
+  /**
+   * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
+   *
+   */
+  public static class CsvRecorderEncoder
+      extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
+    private BeamRecordSqlType rowType;
+    private CSVFormat format;
+    public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
+      this.rowType = rowType;
+      this.format = format;
+    }
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
+      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          BeamRecord in = c.element();
+          c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
+        }
+      }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..1113abf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+  private String bootstrapServers;
+  private List<String> topics;
+  private Map<String, Object> configUpdates;
+
+  protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
+    super(beamSqlRowType);
+  }
+
+  public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+      List<String> topics) {
+    super(beamSqlRowType);
+    this.bootstrapServers = bootstrapServers;
+    this.topics = topics;
+  }
+
+  public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+    this.configUpdates = configUpdates;
+    return this;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
+      getPTransformForInput();
+
+  public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
+      getPTransformForOutput();
+
+  @Override
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("read",
+            KafkaIO.<byte[], byte[]>read()
+                .withBootstrapServers(bootstrapServers)
+                .withTopics(topics)
+                .updateConsumerProperties(configUpdates)
+                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+                .withoutMetadata())
+            .apply("in_format", getPTransformForInput());
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+    checkArgument(topics != null && topics.size() == 1,
+        "Only one topic can be acceptable as output.");
+
+    return new PTransform<PCollection<BeamRecord>, PDone>() {
+      @Override
+      public PDone expand(PCollection<BeamRecord> input) {
+        return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+            KafkaIO.<byte[], byte[]>write()
+                .withBootstrapServers(bootstrapServers)
+                .withTopic(topics.get(0))
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
new file mode 100644
index 0000000..6752e3c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
new file mode 100644
index 0000000..86e7d06
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
new file mode 100644
index 0000000..a2dd6fb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ * <p>
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ * </p>
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BeamTextCSVTable.class);
+
+  private CSVFormat csvFormat;
+
+  /**
+   * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+   */
+  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern)  {
+    this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
+  }
+
+  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    super(beamSqlRowType, filePattern);
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+    return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
+        .apply("parseCSVLine",
+            new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
+  }
+
+  @Override
+  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+    return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 0000000..95f7063
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+    extends PTransform<PCollection<String>, PCollection<BeamRecord>>
+    implements Serializable {
+  private String filePattern;
+  protected BeamRecordSqlType beamSqlRowType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRowType = beamSqlRowType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override
+  public PCollection<BeamRecord> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
+          @ProcessElement
+          public void processElement(ProcessContext ctx) {
+            String str = ctx.element();
+            ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
+          }
+        }));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 0000000..4660ccb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
+    implements Serializable {
+  private String filePattern;
+  protected BeamRecordSqlType beamSqlRowType;
+  protected CSVFormat csvFormat;
+
+  public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
+      CSVFormat csvFormat) {
+    this.filePattern = filePattern;
+    this.beamSqlRowType = beamSqlRowType;
+    this.csvFormat = csvFormat;
+  }
+
+  @Override public PDone expand(PCollection<BeamRecord> input) {
+    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
+
+      @ProcessElement public void processElement(ProcessContext ctx) {
+        BeamRecord row = ctx.element();
+        ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
+      }
+    })).apply(TextIO.write().to(filePattern));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
new file mode 100644
index 0000000..b0d9c11
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
+  protected String filePattern;
+
+  protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
+    super(beamSqlRowType);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
new file mode 100644
index 0000000..8927dca
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 40b7b58..9a50e21 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 7a8d10d..3c6b20f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.BeamRecord;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index aac38c7..719fbf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.BeamRecord;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 8b6206b..8c44780 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
deleted file mode 100644
index 0564820..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamRecordSqlType beamSqlRowType;
-  public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
-    this.beamSqlRowType = beamSqlRowType;
-  }
-
-  @Override public BeamRecordSqlType getRowType() {
-    return beamSqlRowType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
deleted file mode 100644
index bda3ca1..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
-  BOUNDED, UNBOUNDED;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
deleted file mode 100644
index 9d9988e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
- * then a downstream query can query directly.
- */
-public class BeamPCollectionTable extends BaseBeamTable {
-  private BeamIOType ioType;
-  private transient PCollection<BeamRecord> upstream;
-
-  protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
-    super(beamSqlRowType);
-  }
-
-  public BeamPCollectionTable(PCollection<BeamRecord> upstream,
-      BeamRecordSqlType beamSqlRowType){
-    this(beamSqlRowType);
-    ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
-        ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
-    this.upstream = upstream;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return ioType;
-  }
-
-  @Override
-  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
-    return upstream;
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
-    throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
deleted file mode 100644
index 1845988..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordType;
-
-/**
- * Type provider for {@link BeamRecord} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamRecordSqlType extends BeamRecordType {
-  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
-  static {
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
-
-  public List<Integer> fieldTypes;
-
-  protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
-    super(fieldsName, fieldsCoder);
-  }
-
-  private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
-      , List<Coder> fieldsCoder) {
-    super(fieldsName, fieldsCoder);
-    this.fieldTypes = fieldTypes;
-  }
-
-  public static BeamRecordSqlType create(List<String> fieldNames,
-      List<Integer> fieldTypes) {
-    if (fieldNames.size() != fieldTypes.size()) {
-      throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
-    }
-    List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
-    for (int idx = 0; idx < fieldTypes.size(); ++idx) {
-      switch (fieldTypes.get(idx)) {
-      case Types.INTEGER:
-        fieldCoders.add(BigEndianIntegerCoder.of());
-        break;
-      case Types.SMALLINT:
-        fieldCoders.add(ShortCoder.of());
-        break;
-      case Types.TINYINT:
-        fieldCoders.add(ByteCoder.of());
-        break;
-      case Types.DOUBLE:
-        fieldCoders.add(DoubleCoder.of());
-        break;
-      case Types.FLOAT:
-        fieldCoders.add(FloatCoder.of());
-        break;
-      case Types.DECIMAL:
-        fieldCoders.add(BigDecimalCoder.of());
-        break;
-      case Types.BIGINT:
-        fieldCoders.add(BigEndianLongCoder.of());
-        break;
-      case Types.VARCHAR:
-      case Types.CHAR:
-        fieldCoders.add(StringUtf8Coder.of());
-        break;
-      case Types.TIME:
-        fieldCoders.add(TimeCoder.of());
-        break;
-      case Types.DATE:
-      case Types.TIMESTAMP:
-        fieldCoders.add(DateCoder.of());
-        break;
-      case Types.BOOLEAN:
-        fieldCoders.add(BooleanCoder.of());
-        break;
-
-      default:
-        throw new UnsupportedOperationException(
-            "Data type: " + fieldTypes.get(idx) + " not supported yet!");
-      }
-    }
-    return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
-  }
-
-  @Override
-  public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
-    if (null == fieldValue) {// no need to do type check for NULL value
-      return;
-    }
-
-    int fieldType = fieldTypes.get(index);
-    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
-    if (javaClazz == null) {
-      throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
-    }
-
-    if (!fieldValue.getClass().equals(javaClazz)) {
-      throw new IllegalArgumentException(
-          String.format("[%s](%s) doesn't match type [%s]",
-              fieldValue, fieldValue.getClass(), fieldType)
-      );
-    }
-  }
-
-  public List<Integer> getFieldTypes() {
-    return fieldTypes;
-  }
-
-  public Integer getFieldTypeByIndex(int index){
-    return fieldTypes.get(index);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj != null && obj instanceof BeamRecordSqlType) {
-      BeamRecordSqlType ins = (BeamRecordSqlType) obj;
-      return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "BeamRecordSqlType [fieldNames=" + getFieldNames()
-        + ", fieldTypes=" + fieldTypes + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
deleted file mode 100644
index 89eefd1..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-
-/**
- * A {@link Coder} encodes {@link BeamRecord}.
- */
-@Experimental
-public class BeamSqlRecordHelper {
-
-  public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
-    return (BeamRecordSqlType) record.getDataType();
-  }
-
-  /**
-   * {@link Coder} for Java type {@link Short}.
-   */
-  public static class ShortCoder extends CustomCoder<Short> {
-    private static final ShortCoder INSTANCE = new ShortCoder();
-
-    public static ShortCoder of() {
-      return INSTANCE;
-    }
-
-    private ShortCoder() {
-    }
-
-    @Override
-    public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
-      new DataOutputStream(outStream).writeShort(value);
-    }
-
-    @Override
-    public Short decode(InputStream inStream) throws CoderException, IOException {
-      return new DataInputStream(inStream).readShort();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-  /**
-   * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
-   */
-  public static class FloatCoder extends CustomCoder<Float> {
-    private static final FloatCoder INSTANCE = new FloatCoder();
-    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
-
-    public static FloatCoder of() {
-      return INSTANCE;
-    }
-
-    private FloatCoder() {
-    }
-
-    @Override
-    public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
-      CODER.encode(new BigDecimal(value), outStream);
-    }
-
-    @Override
-    public Float decode(InputStream inStream) throws CoderException, IOException {
-      return CODER.decode(inStream).floatValue();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-  /**
-   * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
-   */
-  public static class DoubleCoder extends CustomCoder<Double> {
-    private static final DoubleCoder INSTANCE = new DoubleCoder();
-    private static final BigDecimalCoder CODER = BigDecimalCoder.of();
-
-    public static DoubleCoder of() {
-      return INSTANCE;
-    }
-
-    private DoubleCoder() {
-    }
-
-    @Override
-    public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
-      CODER.encode(new BigDecimal(value), outStream);
-    }
-
-    @Override
-    public Double decode(InputStream inStream) throws CoderException, IOException {
-      return CODER.decode(inStream).doubleValue();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-
-  /**
-   * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
-   */
-  public static class TimeCoder extends CustomCoder<GregorianCalendar> {
-    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-    private static final TimeCoder INSTANCE = new TimeCoder();
-
-    public static TimeCoder of() {
-      return INSTANCE;
-    }
-
-    private TimeCoder() {
-    }
-
-    @Override
-    public void encode(GregorianCalendar value, OutputStream outStream)
-        throws CoderException, IOException {
-      longCoder.encode(value.getTime().getTime(), outStream);
-    }
-
-    @Override
-    public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
-      GregorianCalendar calendar = new GregorianCalendar();
-      calendar.setTime(new Date(longCoder.decode(inStream)));
-      return calendar;
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-  /**
-   * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
-   */
-  public static class DateCoder extends CustomCoder<Date> {
-    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-    private static final DateCoder INSTANCE = new DateCoder();
-
-    public static DateCoder of() {
-      return INSTANCE;
-    }
-
-    private DateCoder() {
-    }
-
-    @Override
-    public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
-      longCoder.encode(value.getTime(), outStream);
-    }
-
-    @Override
-    public Date decode(InputStream inStream) throws CoderException, IOException {
-      return new Date(longCoder.decode(inStream));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-
-  /**
-   * {@link Coder} for Java type {@link Boolean}.
-   */
-  public static class BooleanCoder extends CustomCoder<Boolean> {
-    private static final BooleanCoder INSTANCE = new BooleanCoder();
-
-    public static BooleanCoder of() {
-      return INSTANCE;
-    }
-
-    private BooleanCoder() {
-    }
-
-    @Override
-    public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
-      new DataOutputStream(outStream).writeBoolean(value);
-    }
-
-    @Override
-    public Boolean decode(InputStream inStream) throws CoderException, IOException {
-      return new DataInputStream(inStream).readBoolean();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
deleted file mode 100644
index 828ac43..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * This interface defines a Beam Sql Table.
- */
-public interface BeamSqlTable {
-  /**
-   * In Beam SQL, there's no difference between a batch query and a streaming
-   * query. {@link BeamIOType} is used to validate the sources.
-   */
-  BeamIOType getSourceType();
-
-  /**
-   * create a {@code PCollection<BeamSqlRow>} from source.
-   *
-   */
-  PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
-
-  /**
-   * create a {@code IO.write()} instance to write to target.
-   *
-   */
-   PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
-
-  /**
-   * Get the schema info of the table.
-   */
-   BeamRecordSqlType getRowType();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
deleted file mode 100644
index 191b78e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Interface to create a UDF in Beam SQL.
- *
- * <p>A static method {@code eval} is required. Here is an example:
- *
- * <blockquote><pre>
- * public static class MyLeftFunction {
- *   public String eval(
- *       &#64;Parameter(name = "s") String s,
- *       &#64;Parameter(name = "n", optional = true) Integer n) {
- *     return s.substring(0, n == null ? 1 : n);
- *   }
- * }</pre></blockquote>
- *
- * <p>The first parameter is named "s" and is mandatory,
- * and the second parameter is named "n" and is optional.
- */
-public interface BeamSqlUdf extends Serializable {
-  String UDF_METHOD = "eval";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
deleted file mode 100644
index 687a082..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.NlsString;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.commons.csv.CSVRecord;
-
-/**
- * Utility methods for working with {@code BeamTable}.
- */
-public final class BeamTableUtils {
-  public static BeamRecord csvLine2BeamSqlRow(
-      CSVFormat csvFormat,
-      String line,
-      BeamRecordSqlType beamRecordSqlType) {
-    List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
-    try (StringReader reader = new StringReader(line)) {
-      CSVParser parser = csvFormat.parse(reader);
-      CSVRecord rawRecord = parser.getRecords().get(0);
-
-      if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
-        throw new IllegalArgumentException(String.format(
-            "Expect %d fields, but actually %d",
-            beamRecordSqlType.getFieldCount(), rawRecord.size()
-        ));
-      } else {
-        for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
-          String raw = rawRecord.get(idx);
-          fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
-        }
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("decodeRecord failed!", e);
-    }
-    return new BeamRecord(beamRecordSqlType, fieldsValue);
-  }
-
-  public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
-    StringWriter writer = new StringWriter();
-    try (CSVPrinter printer = csvFormat.print(writer)) {
-      for (int i = 0; i < row.getFieldCount(); i++) {
-        printer.print(row.getFieldValue(i).toString());
-      }
-      printer.println();
-    } catch (IOException e) {
-      throw new IllegalArgumentException("encodeRecord failed!", e);
-    }
-    return writer.toString();
-  }
-
-  public static Object autoCastField(int fieldType, Object rawObj) {
-    if (rawObj == null) {
-      return null;
-    }
-
-    SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
-    // auto-casting for numberics
-    if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
-        || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
-      String raw = rawObj.toString();
-      switch (columnType) {
-        case TINYINT:
-          return Byte.valueOf(raw);
-        case SMALLINT:
-          return Short.valueOf(raw);
-        case INTEGER:
-          return Integer.valueOf(raw);
-        case BIGINT:
-          return Long.valueOf(raw);
-        case FLOAT:
-          return Float.valueOf(raw);
-        case DOUBLE:
-          return Double.valueOf(raw);
-        default:
-          throw new UnsupportedOperationException(
-              String.format("Column type %s is not supported yet!", columnType));
-      }
-    } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
-      // convert NlsString to String
-      if (rawObj instanceof NlsString) {
-        return ((NlsString) rawObj).getValue();
-      } else {
-        return rawObj;
-      }
-    } else {
-      return rawObj;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
deleted file mode 100644
index 8c7e6f0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * A Kafka topic that saves records as CSV format.
- *
- */
-public class BeamKafkaCSVTable extends BeamKafkaTable {
-  private CSVFormat csvFormat;
-  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
-      List<String> topics) {
-    this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
-  }
-
-  public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
-      List<String> topics, CSVFormat format) {
-    super(beamSqlRowType, bootstrapServers, topics);
-    this.csvFormat = format;
-  }
-
-  @Override
-  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
-      getPTransformForInput() {
-    return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
-  }
-
-  @Override
-  public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
-      getPTransformForOutput() {
-    return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
-  }
-
-  /**
-   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
-   *
-   */
-  public static class CsvRecorderDecoder
-      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
-    private BeamRecordSqlType rowType;
-    private CSVFormat format;
-    public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
-      this.rowType = rowType;
-      this.format = format;
-    }
-
-    @Override
-    public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
-      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          String rowInString = new String(c.element().getValue());
-          c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
-        }
-      }));
-    }
-  }
-
-  /**
-   * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
-   *
-   */
-  public static class CsvRecorderEncoder
-      extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
-    private BeamRecordSqlType rowType;
-    private CSVFormat format;
-    public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
-      this.rowType = rowType;
-      this.format = format;
-    }
-
-    @Override
-    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
-      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          BeamRecord in = c.element();
-          c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
-        }
-      }));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
deleted file mode 100644
index 1d57839..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-/**
- * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
- *
- */
-public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
-
-  private String bootstrapServers;
-  private List<String> topics;
-  private Map<String, Object> configUpdates;
-
-  protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
-    super(beamSqlRowType);
-  }
-
-  public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
-      List<String> topics) {
-    super(beamSqlRowType);
-    this.bootstrapServers = bootstrapServers;
-    this.topics = topics;
-  }
-
-  public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
-    this.configUpdates = configUpdates;
-    return this;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return BeamIOType.UNBOUNDED;
-  }
-
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
-      getPTransformForInput();
-
-  public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
-      getPTransformForOutput();
-
-  @Override
-  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply("read",
-            KafkaIO.<byte[], byte[]>read()
-                .withBootstrapServers(bootstrapServers)
-                .withTopics(topics)
-                .updateConsumerProperties(configUpdates)
-                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withoutMetadata())
-            .apply("in_format", getPTransformForInput());
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
-    checkArgument(topics != null && topics.size() == 1,
-        "Only one topic can be acceptable as output.");
-
-    return new PTransform<PCollection<BeamRecord>, PDone>() {
-      @Override
-      public PDone expand(PCollection<BeamRecord> input) {
-        return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
-            KafkaIO.<byte[], byte[]>write()
-                .withBootstrapServers(bootstrapServers)
-                .withTopic(topics.get(0))
-                .withKeySerializer(ByteArraySerializer.class)
-                .withValueSerializer(ByteArraySerializer.class));
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
deleted file mode 100644
index f0ddeb6..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * table schema for KafkaIO.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
deleted file mode 100644
index 9655ebd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * define table schema, to map with Beam IO components.
- *
- */
-package org.apache.beam.sdk.extensions.sql.schema;

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
deleted file mode 100644
index 79e56e6..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
- *
- * <p>
- * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
- * </p>
- */
-public class BeamTextCSVTable extends BeamTextTable {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(BeamTextCSVTable.class);
-
-  private CSVFormat csvFormat;
-
-  /**
-   * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
-   */
-  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern)  {
-    this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
-  }
-
-  public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
-      CSVFormat csvFormat) {
-    super(beamSqlRowType, filePattern);
-    this.csvFormat = csvFormat;
-  }
-
-  @Override
-  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
-        .apply("parseCSVLine",
-            new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
-    return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
deleted file mode 100644
index 018dae5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOReader for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOReader
-    extends PTransform<PCollection<String>, PCollection<BeamRecord>>
-    implements Serializable {
-  private String filePattern;
-  protected BeamRecordSqlType beamSqlRowType;
-  protected CSVFormat csvFormat;
-
-  public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
-      CSVFormat csvFormat) {
-    this.filePattern = filePattern;
-    this.beamSqlRowType = beamSqlRowType;
-    this.csvFormat = csvFormat;
-  }
-
-  @Override
-  public PCollection<BeamRecord> expand(PCollection<String> input) {
-    return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
-          @ProcessElement
-          public void processElement(ProcessContext ctx) {
-            String str = ctx.element();
-            ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
-          }
-        }));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
deleted file mode 100644
index 53eb382..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOWriter for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
-    implements Serializable {
-  private String filePattern;
-  protected BeamRecordSqlType beamSqlRowType;
-  protected CSVFormat csvFormat;
-
-  public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
-      CSVFormat csvFormat) {
-    this.filePattern = filePattern;
-    this.beamSqlRowType = beamSqlRowType;
-    this.csvFormat = csvFormat;
-  }
-
-  @Override public PDone expand(PCollection<BeamRecord> input) {
-    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
-
-      @ProcessElement public void processElement(ProcessContext ctx) {
-        BeamRecord row = ctx.element();
-        ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
-      }
-    })).apply(TextIO.write().to(filePattern));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
deleted file mode 100644
index 80e81aa..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-
-/**
- * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
- */
-public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
-  protected String filePattern;
-
-  protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
-    super(beamSqlRowType);
-    this.filePattern = filePattern;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return BeamIOType.BOUNDED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
deleted file mode 100644
index f914e2e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Table schema for text files.
- */
-package org.apache.beam.sdk.extensions.sql.schema.text;


Mime
View raw message