nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] 01/14: tpch
Date Wed, 17 Oct 2018 01:13:18 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 10470a09793bf8b999a3cf129b55d8fbf141f176
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Sun Sep 9 10:34:15 2018 +0900

    tpch
---
 .../apache/nemo/examples/beam/tpch/Schemas.java    | 353 +++++++++++++++++++++
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   | 270 ++++++++++++++++
 2 files changed, 623 insertions(+)

diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java
new file mode 100644
index 0000000..b464789
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam.tpch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * A simple SQL application.
+ * (Copied/Refined from the example code in the Beam repository)
+ */
+public final class Schemas {
+  //  private static final JavaTypeFactory TYPE_FACTORY =
+  //      new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+  //  private final ImmutableMap<String, TpcdsTable> tableHMap;
+  private final ImmutableMap<String, String> columnPrefixes;
+  public static Schema storeSalesSchema =
+    Schema.builder()
+      .addNullableField("ss_sold_date_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_sold_time_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_item_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_customer_sk", Schema.FieldType.STRING)
+      .addNullableField("ss_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_store_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_promo_sk", Schema.FieldType.INT32)
+      .addNullableField("ss_ticket_number", Schema.FieldType.INT64)
+      .addNullableField("ss_quantity", Schema.FieldType.INT32)
+      .addNullableField("ss_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("ss_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("ss_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("ss_ext_discount_amt", Schema.FieldType.FLOAT)
+      .addNullableField("ss_ext_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("ss_ext_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("ss_ext_tax", Schema.FieldType.FLOAT)
+      .addNullableField("ss_coupon_amt", Schema.FieldType.FLOAT)
+      .addNullableField("ss_net_paid", Schema.FieldType.FLOAT)
+      .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.FLOAT)
+      .addNullableField("ss_net_profit", Schema.FieldType.FLOAT)
+      .build();
+  public static Schema dateDimSchema =
+    Schema.builder()
+      .addNullableField("d_date_sk", Schema.FieldType.INT32)
+      .addNullableField("d_date_id", Schema.FieldType.STRING)
+      .addNullableField("d_date", Schema.FieldType.STRING)
+      .addNullableField("d_month_seq", Schema.FieldType.INT32)
+      .addNullableField("d_week_seq", Schema.FieldType.INT32)
+      .addNullableField("d_quarter_seq", Schema.FieldType.INT32)
+      .addNullableField("d_year", Schema.FieldType.INT32)
+      .addNullableField("d_dow", Schema.FieldType.INT32)
+      .addNullableField("d_moy", Schema.FieldType.INT32)
+      .addNullableField("d_dom", Schema.FieldType.INT32)
+      .addNullableField("d_qoy", Schema.FieldType.INT32)
+      .addNullableField("d_fy_year", Schema.FieldType.INT32)
+      .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT32)
+      .addNullableField("d_fy_week_seq", Schema.FieldType.INT32)
+      .addNullableField("d_day_name", Schema.FieldType.STRING)
+      .addNullableField("d_quarter_name", Schema.FieldType.STRING)
+      .addNullableField("d_holiday", Schema.FieldType.STRING)
+      .addNullableField("d_weekend", Schema.FieldType.STRING)
+      .addNullableField("d_following_holiday", Schema.FieldType.STRING)
+      .addNullableField("d_first_dom", Schema.FieldType.INT32)
+      .addNullableField("d_last_dom", Schema.FieldType.INT32)
+      .addNullableField("d_same_day_ly", Schema.FieldType.INT32)
+      .addNullableField("d_same_day_lq", Schema.FieldType.INT32)
+      .addNullableField("d_current_day", Schema.FieldType.STRING)
+      .addNullableField("d_current_week", Schema.FieldType.STRING)
+      .addNullableField("d_current_month", Schema.FieldType.STRING)
+      .addNullableField("d_current_quarter", Schema.FieldType.STRING)
+      .addNullableField("d_current_year", Schema.FieldType.STRING)
+      .build();
+  public static Schema itemSchema =
+    Schema.builder()
+      .addNullableField("i_item_sk", Schema.FieldType.INT32)
+      .addNullableField("i_item_id", Schema.FieldType.STRING) //                 .string,
+      .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME) //          .date,
+      .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME) //            .date,
+      .addNullableField("i_item_desc", Schema.FieldType.STRING) //             .string,
+      .addNullableField("i_current_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
+      .addNullableField(
+        "i_wholesale_cost", Schema.FieldType.FLOAT) //               .decimal(7,2),
+      .addNullableField("i_brand_id", Schema.FieldType.INT32) //                .int,
+      .addNullableField("i_brand", Schema.FieldType.STRING) //                   .string,
+      .addNullableField("i_class_id", Schema.FieldType.INT32) //                .int,
+      .addNullableField("i_class", Schema.FieldType.STRING) //                   .string,
+      .addNullableField("i_category_id", Schema.FieldType.INT32) //             .int,
+      .addNullableField("i_category", Schema.FieldType.STRING) //       .string,
+      .addNullableField("i_manufact_id", Schema.FieldType.INT32) //             .int,
+      .addNullableField("i_manufact", Schema.FieldType.STRING) //                .string,
+      .addNullableField("i_size", Schema.FieldType.STRING) //                    .string,
+      .addNullableField("i_formulation", Schema.FieldType.STRING) //             .string,
+      .addNullableField("i_color", Schema.FieldType.STRING) //                   .string,
+      .addNullableField("i_units", Schema.FieldType.STRING) //                   .string,
+      .addNullableField("i_container", Schema.FieldType.STRING) //               .string,
+      .addNullableField("i_manager_id", Schema.FieldType.INT32) // .int,
+      .addNullableField("i_product_name", Schema.FieldType.STRING) //            .string),
+      .build();
+  public static Schema inventorySchema =
+    Schema.builder()
+      .addNullableField("inv_date_sk", Schema.FieldType.INT32)
+      .addNullableField("inv_item_sk", Schema.FieldType.INT32)
+      .addNullableField("inv_warehouse_sk", Schema.FieldType.INT32)
+      .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
+      .build();
+  public static Schema catalogSalesSchema =
+    Schema.builder()
+      .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32) //      .int,
+      .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32) //      .int,
+      .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_call_center_sk", Schema.FieldType.INT32) //        .int,
+      .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32) //       .int,
+      .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("cs_item_sk", Schema.FieldType.INT32) //               .int,
+      .addNullableField("cs_promo_sk", Schema.FieldType.INT32) //              .int,
+      .addNullableField("cs_order_number", Schema.FieldType.INT64) //          .long,
+      .addNullableField("cs_quantity", Schema.FieldType.INT32) //              .int,
+      .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT) //        .decimal(7,2),
+      .addNullableField("cs_list_price", Schema.FieldType.FLOAT) //            .decimal(7,2),
+      .addNullableField("cs_sales_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
+      .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT) //      .decimal(7,2),
+      .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT) //       .decimal(7,2),
+      .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT) //    .decimal(7,2),
+      .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT) //        .decimal(7,2),
+      .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT) //               .decimal(7,2),
+      .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT) //            .decimal(7,2),
+      .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT) //         .decimal(7,2),
+      .addNullableField("cs_net_paid", Schema.FieldType.FLOAT) //              .decimal(7,2),
+      .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT) //      .decimal(7,2),
+      .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT) //     .decimal(7,2),
+      .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2),
+      .addNullableField("cs_net_profit", Schema.FieldType.FLOAT) //            .decimal(7,2))
+      .build();
+  public static final Schema ORDER_SCHEMA =
+    Schema.builder()
+      .addInt32Field("o_orderkey")
+      .addInt32Field("o_custkey")
+      .addStringField("o_orderstatus")
+      .addFloatField("o_totalprice")
+      .addStringField("o_orderdate")
+      .addStringField("o_orderpriority")
+      .addStringField("o_clerk")
+      .addInt32Field("o_shippriority")
+      .addStringField("o_comment")
+      .build();
+  public static final Schema CUSTOMER_SCHEMA =
+    Schema.builder()
+      .addInt32Field("c_custkey")
+      .addStringField("c_name")
+      .addStringField("c_address")
+      .addInt32Field("c_nationkey")
+      .addStringField("c_phone")
+      .addFloatField("c_acctbal")
+      .addStringField("c_mktsegment")
+      .addStringField("c_comment")
+      .build();
+  public static Schema getCustomerDsSchema =
+    Schema.builder()
+      .addNullableField("c_customer_sk", Schema.FieldType.INT32) //             .int,
+      .addNullableField("c_customer_id", Schema.FieldType.STRING) //             .string,
+      .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32) //        .int,
+      .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32) //        .int,
+      .addNullableField("c_current_addr_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32) //    .int,
+      .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32) //     .int,
+      .addNullableField("c_salutation", Schema.FieldType.STRING) //              .string,
+      .addNullableField("c_first_name", Schema.FieldType.STRING) //              .string,
+      .addNullableField("c_last_name", Schema.FieldType.STRING) //               .string,
+      .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING) //     .string,
+      .addNullableField("c_birth_day", Schema.FieldType.INT32) //               .int,
+      .addNullableField("c_birth_month", Schema.FieldType.INT32) //             .int,
+      .addNullableField("c_birth_year", Schema.FieldType.INT32) //              .int,
+      .addNullableField("c_birth_country", Schema.FieldType.STRING) //           .string,
+      .addNullableField("c_login", Schema.FieldType.STRING) //                   .string,
+      .addNullableField("c_email_address", Schema.FieldType.STRING) //           .string,
+      .addNullableField("c_last_review_date", Schema.FieldType.STRING) //        .string)
+      .build();
+  public static final Schema LINEITEM_SCHEMA =
+    Schema.builder()
+      .addInt32Field("l_orderkey")
+      .addInt32Field("l_partkey")
+      .addInt32Field("l_suppkey")
+      .addInt32Field("l_linenumber")
+      .addFloatField("l_quantity")
+      .addFloatField("l_extendedprice")
+      .addFloatField("l_discount")
+      .addFloatField("l_tax")
+      .addStringField("l_returnflag")
+      .addStringField("l_linestatus")
+      .addStringField("l_shipdate")
+      .addStringField("l_commitdate")
+      .addStringField("l_receiptdate")
+      .addStringField("l_shipinstruct")
+      .addStringField("l_shipmode")
+      .addStringField("l_comment")
+      .build();
+  public static final Schema PARTSUPP_SCHEMA =
+    Schema.builder()
+      .addInt32Field("ps_partkey") // identifier
+      .addInt32Field("ps_suppkey") // identifier
+      .addInt32Field("ps_availqty") // integer
+      .addFloatField("ps_supplycost") // decimal
+      .addStringField("ps_comment") // variable text, size 199
+      .build();
+  public static final Schema REGION_SCHEMA =
+    Schema.builder()
+      .addInt32Field("r_regionkey") // identifier
+      .addStringField("r_name") // fixed text, size 25
+      .addStringField("r_comment") // variable text, size 152
+      .build();
+  public static final Schema SUPPLIER_SCHEMA =
+    Schema.builder()
+      .addInt32Field("s_suppkey") // identifier
+      .addStringField("s_name") // fixed text, size 25
+      .addStringField("s_address") // variable text, size 40
+      .addInt32Field("s_nationkey") // identifier
+      .addStringField("s_phone") // fixed text, size 15
+      .addFloatField("s_acctbal") // decimal
+      .addStringField("s_comment") // variable text, size 101
+      .build();
+  public static final Schema PART_SCHEMA =
+    Schema.builder()
+      .addInt32Field("p_partkey")
+      .addStringField("p_name")
+      .addStringField("p_mfgr")
+      .addStringField("p_brand")
+      .addStringField("p_type")
+      .addInt32Field("p_size")
+      .addStringField("p_container")
+      .addFloatField("p_retailprice")
+      .addStringField("p_comment")
+      .build();
+  public static final Schema NATION_SCHEMA =
+    Schema.builder()
+      .addInt32Field("n_nationkey")
+      .addStringField("n_name")
+      .addInt32Field("n_regionkey")
+      .addStringField("n_comment")
+      .build();
+  public static Schema promotionSchema =
+    Schema.builder()
+      .addNullableField("p_promo_sk", Schema.FieldType.INT32)
+      .addNullableField("p_promo_id", Schema.FieldType.STRING) //                .string,
+      .addNullableField("p_start_date_sk", Schema.FieldType.INT32) // .int,
+      .addNullableField("p_end_date_sk", Schema.FieldType.INT32) // .int,
+      .addNullableField("p_item_sk", Schema.FieldType.INT32) // .int,
+      .addNullableField("p_cost", Schema.FieldType.FLOAT) // .decimal(15,2),
+      .addNullableField("p_response_target", Schema.FieldType.INT32) // .int,
+      .addNullableField("p_promo_name", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_dmail", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_email", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_catalog", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_tv", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_radio", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_press", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_event", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_demo", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_channel_details", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_purpose", Schema.FieldType.STRING) // .string,
+      .addNullableField("p_discount_active", Schema.FieldType.STRING) // .string),
+      .build();
+  public static Schema customerDemographicsSchema =
+    Schema.builder()
+      .addNullableField("cd_demo_sk", Schema.FieldType.INT32)
+      .addNullableField("cd_gender", Schema.FieldType.STRING) //                 .string,
+      .addNullableField("cd_marital_status", Schema.FieldType.STRING) //         .string,
+      .addNullableField("cd_education_status", Schema.FieldType.STRING) //       .string,
+      .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32) //      .int,
+      .addNullableField("cd_credit_rating", Schema.FieldType.STRING) //          .string,
+      .addNullableField("cd_dep_count", Schema.FieldType.INT32) //              .int,
+      .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32) //     .int,
+      .addNullableField("cd_dep_college_count", Schema.FieldType.INT32) //      .int),
+      .build();
+  public static Schema webSalesSchema =
+    Schema.builder()
+      .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_item_sk", Schema.FieldType.INT32) //               .int,
+      .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32) //      .int,
+      .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32) //      .int,
+      .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32) //         .int,
+      .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_web_page_sk", Schema.FieldType.INT32) //           .int,
+      .addNullableField("ws_web_site_sk", Schema.FieldType.INT32) //           .int,
+      .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32) //          .int,
+      .addNullableField("ws_promo_sk", Schema.FieldType.INT32) //              .int,
+      .addNullableField("ws_order_number", Schema.FieldType.INT64) //          .long,
+      .addNullableField("ws_quantity", Schema.FieldType.INT32) //              .int,
+      .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT) //        .decimal(7,2),
+      .addNullableField("ws_list_price", Schema.FieldType.FLOAT) //            .decimal(7,2),
+      .addNullableField("ws_sales_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
+      .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT) //      .decimal(7,2),
+      .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT) //       .decimal(7,2),
+      .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT) //    .decimal(7,2),
+      .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT) //        .decimal(7,2),
+      .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT) //               .decimal(7,2),
+      .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT) //            .decimal(7,2),
+      .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT) //         .decimal(7,2),
+      .addNullableField("ws_net_paid", Schema.FieldType.FLOAT) //              .decimal(7,2),
+      .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT) //      .decimal(7,2),
+      .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT) //     .decimal(7,2),
+      .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2),
+      .addNullableField("ws_net_profit", Schema.FieldType.FLOAT) //            .decimal(7,2)),
+      .build();
+  public Schemas() {
+    //    final ImmutableMap.Builder<String, TpcdsTable> builder = ImmutableMap.builder();
+    //    for (TpcdsTable<?> tpcdsTable : TpcdsTable.getTables()) {
+    //      builder.put(tpcdsTable.getTableName(), tpcdsTable);
+    //    }
+    //    this.tableHMap = builder.build();
+    this.columnPrefixes =
+      ImmutableMap.<String, String>builder()
+        .put("lineitem", "l_")
+        .put("customer", "c_")
+        .put("supplier", "s_")
+        .put("partsupp", "ps_")
+        .put("part", "p_")
+        .put("orders", "o_")
+        .put("nation", "n_")
+        .put("region", "r_")
+        .build();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
new file mode 100644
index 0000000..4ebf0f3
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam.tpch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.*;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
+
+/**
+ * A simple SQL application.
+ * (Copied/Refined from the example code in the Beam repository)
+ */
+public final class Tpch {
+  private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName());
+
+  public static final String QUERY1 =
+    "select\n"
+      + "\tl_returnflag,\n"
+      + "\tl_linestatus,\n"
+      + "\tsum(l_quantity) as sum_qty,\n"
+      + "\tsum(l_extendedprice) as sum_base_price,\n"
+      + "\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n"
+      + "\tsum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,\n"
+      + "\tavg(l_quantity) as avg_qty,\n"
+      + "\tavg(l_extendedprice) as avg_price,\n"
+      + "\tavg(l_discount) as avg_disc,\n"
+      + "\tcount(*) as count_order\n"
+      + "from\n"
+      + "\tlineitem\n"
+      + "where\n"
+      + "\tl_shipdate <= date '1998-12-01' - interval '90' day (3)\n"
+      + "group by\n"
+      + "\tl_returnflag,\n"
+      + "\tl_linestatus\n"
+      + "order by\n"
+      + "\tl_returnflag,\n"
+      + "\tl_linestatus limit 10";
+
+  public static final String QUERY3 =
+    "select\n"
+      + "\tl_orderkey,\n"
+      + "\tsum(l_extendedprice * (1 - l_discount)) as revenue,\n"
+      + "\to_orderdate,\n"
+      + "\to_shippriority\n"
+      + "from\n"
+      + "\tcustomer,\n"
+      + "\torders,\n"
+      + "\tlineitem\n"
+      + "where\n"
+      + "\tc_mktsegment = 'BUILDING'\n"
+      + "\tand c_custkey = o_custkey\n"
+      + "\tand l_orderkey = o_orderkey\n"
+      + "\tand o_orderdate < date '1995-03-15'\n"
+      + "\tand l_shipdate > date '1995-03-15'\n"
+      + "group by\n"
+      + "\tl_orderkey,\n"
+      + "\to_orderdate,\n"
+      + "\to_shippriority\n"
+      + "order by\n"
+      + "\trevenue desc,\n"
+      + "\to_orderdate\n"
+      + "limit 10";
+
+  public static final String QUERY4 =
+    "select\n"
+      + "\to_orderpriority,\n"
+      + "\tcount(*) as order_count\n"
+      + "from\n"
+      + "\torders\n"
+      + "where\n"
+      + "\to_orderdate >= date '1993-07-01'\n"
+      + "\tand o_orderdate < date '1993-07-01' + interval '3' month\n"
+      + "\tand exists (\n"
+      + "\t\tselect\n"
+      + "\t\t\t*\n"
+      + "\t\tfrom\n"
+      + "\t\t\tlineitem\n"
+      + "\t\twhere\n"
+      + "\t\t\tl_orderkey = o_orderkey\n"
+      + "\t\t\tand l_commitdate < l_receiptdate\n"
+      + "\t)\n"
+      + "group by\n"
+      + "\to_orderpriority\n"
+      + "order by\n"
+      + "\to_orderpriority limit 10";
+
+  public static final String QUERY5 =
+    "select\n"
+      + "\tn_name,\n"
+      + "\tsum(l_extendedprice * (1 - l_discount)) as revenue\n"
+      + "from\n"
+      + "\tcustomer,\n"
+      + "\torders,\n"
+      + "\tlineitem,\n"
+      + "\tsupplier,\n"
+      + "\tnation,\n"
+      + "\tregion\n"
+      + "where\n"
+      + "\tc_custkey = o_custkey\n"
+      + "\tand l_orderkey = o_orderkey\n"
+      + "\tand l_suppkey = s_suppkey\n"
+      + "\tand c_nationkey = s_nationkey\n"
+      + "\tand s_nationkey = n_nationkey\n"
+      + "\tand n_regionkey = r_regionkey\n"
+      + "\tand r_name = 'ASIA'\n"
+      + "\tand o_orderdate >= date '1994-01-01'\n"
+      + "\tand o_orderdate < date '1994-01-01' + interval '1' year\n"
+      + "group by\n"
+      + "\tn_name\n"
+      + "order by\n"
+      + "\trevenue desc limit 10";
+
+  public static final String QUERY6 =
+    "select\n"
+      + "\tsum(l_extendedprice * l_discount) as revenue\n"
+      + "from\n"
+      + "\tlineitem\n"
+      + "where\n"
+      + "\tl_shipdate >= date '1994-01-01'\n"
+      + "\tand l_shipdate < date '1994-01-01' + interval '1' year\n"
+      + "\tand l_discount between .06 - 0.01 and .06 + 0.01\n"
+      + "\tand l_quantity < 24 limit 10";
+
+
+  /**
+   * Private Constructor.
+   */
+  private Tpch() {
+  }
+
+  static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
implements Serializable {
+
+    private CSVFormat csvFormat;
+
+    public RowToCsv(CSVFormat csvFormat) {
+      this.csvFormat = csvFormat;
+    }
+
+    public CSVFormat getCsvFormat() {
+      return csvFormat;
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<Row> input) {
+      return input.apply(
+        "rowToCsv",
+        MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
+    }
+  }
+
+  private static PCollectionTuple getHTables(final Pipeline pipeline,
+                                             final CSVFormat csvFormat,
+                                             final String inputDirectory) {
+    final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder()
+      //.put("customer", Schemas.CUSTOMER_SCHEMA)
+      .put("lineitem", Schemas.LINEITEM_SCHEMA)
+      //.put("nation", Schemas.NATION_SCHEMA)
+      //.put("orders", Schemas.ORDER_SCHEMA)
+      //.put("part", Schemas.PART_SCHEMA)
+      //.put("partsupp", Schemas.PARTSUPP_SCHEMA)
+      //.put("region", Schemas.REGION_SCHEMA)
+      //.put("supplier", Schemas.SUPPLIER_SCHEMA)
+      //              .put("store_sales", Schemas.storeSalesSchema)
+      //              .put("catalog_sales", Schemas.catalogSalesSchema)
+      //              .put("item", Schemas.itemSchema)
+      //              .put("date_dim", Schemas.dateDimSchema)
+      //              .put("promotion", Schemas.promotionSchema)
+      //              .put("customer_demographics", Schemas.customerDemographicsSchema)
+      //              .put("web_sales", Schemas.webSalesSchema)
+      //              .put("inventory", Schemas.inventorySchema)
+      .build();
+
+    PCollectionTuple tables = PCollectionTuple.empty(pipeline);
+    for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
+      final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl";
+      final PCollection<Row> table = new TextTable(
+        tableSchema.getValue(),
+        filePattern,
+        new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat),
+        new RowToCsv(csvFormat))
+        .buildIOReader(pipeline.begin())
+        .setCoder(tableSchema.getValue().getRowCoder())
+        .setName(tableSchema.getKey());
+      tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
+
+      LOG.info("FilePattern {} / Tables {}", filePattern, tables);
+    }
+    return tables;
+  }
+
+
+  /**
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final int queryId = Integer.valueOf(args[0]);
+    final String inputDirectory = args[1];
+    final String outputFilePath = args[2];
+
+    final Map<Integer, String> idToQuery = new HashMap<>();
+    idToQuery.put(1, QUERY1);
+    idToQuery.put(3, QUERY3);
+    idToQuery.put(4, QUERY4);
+    idToQuery.put(5, QUERY5);
+    idToQuery.put(6, QUERY6);
+
+    LOG.info("{} / {}", queryId, inputDirectory, outputFilePath);
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("SimpleSQL");
+    final Pipeline p = Pipeline.create(options);
+
+    // Create tables
+    final CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString("");
+    final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory);
+
+    // Run the TPC-H query
+    final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId)));
+
+    /*
+    final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via(
+      new SerializableFunction<Row, String>() {
+        @Override
+        public String apply(Row input) {
+          System.out.println("row: " + input.getValues());
+          return "row: " + input.getValues();
+        }
+      }));
+
+    GenericSourceSink.write(resultToWrite, outputFilePath);
+    */
+
+    // Then run
+    p.run();
+  }
+}


Mime
View raw message