nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] 03/14: tpch setup
Date Wed, 17 Oct 2018 01:13:20 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 413ba2906212e4bc7691ae84f07a7fb6034b7d90
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Sun Sep 9 11:22:05 2018 +0900

    tpch setup
---
 .../apache/nemo/examples/beam/tpch/Schemas.java    | 372 +++++++++++----------
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   |  43 +--
 ...leSumSQLITCase.java => SQLSimpleSumITCase.java} |   4 +-
 ...mSimpleSumSQLITCase.java => SQLTpchITCase.java} |  20 +-
 4 files changed, 228 insertions(+), 211 deletions(-)

diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java
index b464789..a778abe 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java
@@ -21,14 +21,27 @@ import org.apache.beam.sdk.schemas.Schema;
 
 /**
  * A simple SQL application.
- * (Copied/Refined from the example code in the Beam repository)
+ * (Copied and adapted from https://github.com/apache/beam/pull/6240)
  */
 public final class Schemas {
-  //  private static final JavaTypeFactory TYPE_FACTORY =
-  //      new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
-  //  private final ImmutableMap<String, TpcdsTable> tableHMap;
-  private final ImmutableMap<String, String> columnPrefixes;
-  public static Schema storeSalesSchema =
+  /**
+   * Private.
+   */
+  private Schemas() {
+  }
+
+  public static final ImmutableMap<String, String> COLUMN_PREFIX = ImmutableMap.<String,
String>builder()
+    .put("lineitem", "l_")
+    .put("customer", "c_")
+    .put("supplier", "s_")
+    .put("partsupp", "ps_")
+    .put("part", "p_")
+    .put("orders", "o_")
+    .put("nation", "n_")
+    .put("region", "r_")
+    .build();
+
+  public static final Schema STORE_SALES_SCHEMA =
     Schema.builder()
       .addNullableField("ss_sold_date_sk", Schema.FieldType.INT32)
       .addNullableField("ss_sold_time_sk", Schema.FieldType.INT32)
@@ -54,7 +67,8 @@ public final class Schemas {
       .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.FLOAT)
       .addNullableField("ss_net_profit", Schema.FieldType.FLOAT)
       .build();
-  public static Schema dateDimSchema =
+
+  public static final Schema DATE_DIM_SCHEMA =
     Schema.builder()
       .addNullableField("d_date_sk", Schema.FieldType.INT32)
       .addNullableField("d_date_id", Schema.FieldType.STRING)
@@ -85,76 +99,79 @@ public final class Schemas {
       .addNullableField("d_current_quarter", Schema.FieldType.STRING)
       .addNullableField("d_current_year", Schema.FieldType.STRING)
       .build();
-  public static Schema itemSchema =
+
+  public static final Schema ITEM_SCHEMA =
     Schema.builder()
       .addNullableField("i_item_sk", Schema.FieldType.INT32)
-      .addNullableField("i_item_id", Schema.FieldType.STRING) //                 .string,
-      .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME) //          .date,
-      .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME) //            .date,
-      .addNullableField("i_item_desc", Schema.FieldType.STRING) //             .string,
-      .addNullableField("i_current_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
-      .addNullableField(
-        "i_wholesale_cost", Schema.FieldType.FLOAT) //               .decimal(7,2),
-      .addNullableField("i_brand_id", Schema.FieldType.INT32) //                .int,
-      .addNullableField("i_brand", Schema.FieldType.STRING) //                   .string,
-      .addNullableField("i_class_id", Schema.FieldType.INT32) //                .int,
-      .addNullableField("i_class", Schema.FieldType.STRING) //                   .string,
-      .addNullableField("i_category_id", Schema.FieldType.INT32) //             .int,
-      .addNullableField("i_category", Schema.FieldType.STRING) //       .string,
-      .addNullableField("i_manufact_id", Schema.FieldType.INT32) //             .int,
-      .addNullableField("i_manufact", Schema.FieldType.STRING) //                .string,
-      .addNullableField("i_size", Schema.FieldType.STRING) //                    .string,
-      .addNullableField("i_formulation", Schema.FieldType.STRING) //             .string,
-      .addNullableField("i_color", Schema.FieldType.STRING) //                   .string,
-      .addNullableField("i_units", Schema.FieldType.STRING) //                   .string,
-      .addNullableField("i_container", Schema.FieldType.STRING) //               .string,
-      .addNullableField("i_manager_id", Schema.FieldType.INT32) // .int,
-      .addNullableField("i_product_name", Schema.FieldType.STRING) //            .string),
+      .addNullableField("i_item_id", Schema.FieldType.STRING)
+      .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME)
+      .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME)
+      .addNullableField("i_item_desc", Schema.FieldType.STRING)
+      .addNullableField("i_current_price", Schema.FieldType.FLOAT)
+      .addNullableField("i_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("i_brand_id", Schema.FieldType.INT32)
+      .addNullableField("i_brand", Schema.FieldType.STRING)
+      .addNullableField("i_class_id", Schema.FieldType.INT32)
+      .addNullableField("i_class", Schema.FieldType.STRING)
+      .addNullableField("i_category_id", Schema.FieldType.INT32)
+      .addNullableField("i_category", Schema.FieldType.STRING)
+      .addNullableField("i_manufact_id", Schema.FieldType.INT32)
+      .addNullableField("i_manufact", Schema.FieldType.STRING)
+      .addNullableField("i_size", Schema.FieldType.STRING)
+      .addNullableField("i_formulation", Schema.FieldType.STRING)
+      .addNullableField("i_color", Schema.FieldType.STRING)
+      .addNullableField("i_units", Schema.FieldType.STRING)
+      .addNullableField("i_container", Schema.FieldType.STRING)
+      .addNullableField("i_manager_id", Schema.FieldType.INT32)
+      .addNullableField("i_product_name", Schema.FieldType.STRING)
       .build();
-  public static Schema inventorySchema =
+
+  public static final Schema INVENTORY_SCHEMA =
     Schema.builder()
       .addNullableField("inv_date_sk", Schema.FieldType.INT32)
       .addNullableField("inv_item_sk", Schema.FieldType.INT32)
       .addNullableField("inv_warehouse_sk", Schema.FieldType.INT32)
       .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32)
       .build();
-  public static Schema catalogSalesSchema =
+
+  public static final Schema CATALOG_SALES_SCHEMA =
     Schema.builder()
-      .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32) //      .int,
-      .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32) //      .int,
-      .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_call_center_sk", Schema.FieldType.INT32) //        .int,
-      .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32) //       .int,
-      .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("cs_item_sk", Schema.FieldType.INT32) //               .int,
-      .addNullableField("cs_promo_sk", Schema.FieldType.INT32) //              .int,
-      .addNullableField("cs_order_number", Schema.FieldType.INT64) //          .long,
-      .addNullableField("cs_quantity", Schema.FieldType.INT32) //              .int,
-      .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT) //        .decimal(7,2),
-      .addNullableField("cs_list_price", Schema.FieldType.FLOAT) //            .decimal(7,2),
-      .addNullableField("cs_sales_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
-      .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT) //      .decimal(7,2),
-      .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT) //       .decimal(7,2),
-      .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT) //    .decimal(7,2),
-      .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT) //        .decimal(7,2),
-      .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT) //               .decimal(7,2),
-      .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT) //            .decimal(7,2),
-      .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT) //         .decimal(7,2),
-      .addNullableField("cs_net_paid", Schema.FieldType.FLOAT) //              .decimal(7,2),
-      .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT) //      .decimal(7,2),
-      .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT) //     .decimal(7,2),
-      .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2),
-      .addNullableField("cs_net_profit", Schema.FieldType.FLOAT) //            .decimal(7,2))
+      .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_call_center_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_item_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_promo_sk", Schema.FieldType.INT32)
+      .addNullableField("cs_order_number", Schema.FieldType.INT64)
+      .addNullableField("cs_quantity", Schema.FieldType.INT32)
+      .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("cs_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("cs_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT)
+      .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT)
+      .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT)
+      .addNullableField("cs_net_paid", Schema.FieldType.FLOAT)
+      .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT)
+      .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT)
+      .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT)
+      .addNullableField("cs_net_profit", Schema.FieldType.FLOAT)
       .build();
+
   public static final Schema ORDER_SCHEMA =
     Schema.builder()
       .addInt32Field("o_orderkey")
@@ -167,6 +184,7 @@ public final class Schemas {
       .addInt32Field("o_shippriority")
       .addStringField("o_comment")
       .build();
+
   public static final Schema CUSTOMER_SCHEMA =
     Schema.builder()
       .addInt32Field("c_custkey")
@@ -178,27 +196,29 @@ public final class Schemas {
       .addStringField("c_mktsegment")
       .addStringField("c_comment")
       .build();
-  public static Schema getCustomerDsSchema =
+
+  public static final Schema CUSTOMER_DS_SCHEMA =
     Schema.builder()
-      .addNullableField("c_customer_sk", Schema.FieldType.INT32) //             .int,
-      .addNullableField("c_customer_id", Schema.FieldType.STRING) //             .string,
-      .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32) //        .int,
-      .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32) //        .int,
-      .addNullableField("c_current_addr_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32) //    .int,
-      .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32) //     .int,
-      .addNullableField("c_salutation", Schema.FieldType.STRING) //              .string,
-      .addNullableField("c_first_name", Schema.FieldType.STRING) //              .string,
-      .addNullableField("c_last_name", Schema.FieldType.STRING) //               .string,
-      .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING) //     .string,
-      .addNullableField("c_birth_day", Schema.FieldType.INT32) //               .int,
-      .addNullableField("c_birth_month", Schema.FieldType.INT32) //             .int,
-      .addNullableField("c_birth_year", Schema.FieldType.INT32) //              .int,
-      .addNullableField("c_birth_country", Schema.FieldType.STRING) //           .string,
-      .addNullableField("c_login", Schema.FieldType.STRING) //                   .string,
-      .addNullableField("c_email_address", Schema.FieldType.STRING) //           .string,
-      .addNullableField("c_last_review_date", Schema.FieldType.STRING) //        .string)
+      .addNullableField("c_customer_sk", Schema.FieldType.INT32)
+      .addNullableField("c_customer_id", Schema.FieldType.STRING)
+      .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("c_current_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32)
+      .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32)
+      .addNullableField("c_salutation", Schema.FieldType.STRING)
+      .addNullableField("c_first_name", Schema.FieldType.STRING)
+      .addNullableField("c_last_name", Schema.FieldType.STRING)
+      .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING)
+      .addNullableField("c_birth_day", Schema.FieldType.INT32)
+      .addNullableField("c_birth_month", Schema.FieldType.INT32)
+      .addNullableField("c_birth_year", Schema.FieldType.INT32)
+      .addNullableField("c_birth_country", Schema.FieldType.STRING)
+      .addNullableField("c_login", Schema.FieldType.STRING)
+      .addNullableField("c_email_address", Schema.FieldType.STRING)
+      .addNullableField("c_last_review_date", Schema.FieldType.STRING)
       .build();
+
   public static final Schema LINEITEM_SCHEMA =
     Schema.builder()
       .addInt32Field("l_orderkey")
@@ -218,30 +238,34 @@ public final class Schemas {
       .addStringField("l_shipmode")
       .addStringField("l_comment")
       .build();
+
   public static final Schema PARTSUPP_SCHEMA =
     Schema.builder()
-      .addInt32Field("ps_partkey") // identifier
-      .addInt32Field("ps_suppkey") // identifier
-      .addInt32Field("ps_availqty") // integer
-      .addFloatField("ps_supplycost") // decimal
-      .addStringField("ps_comment") // variable text, size 199
+      .addInt32Field("ps_partkey")
+      .addInt32Field("ps_suppkey")
+      .addInt32Field("ps_availqty")
+      .addFloatField("ps_supplycost")
+      .addStringField("ps_comment")
       .build();
+
   public static final Schema REGION_SCHEMA =
     Schema.builder()
-      .addInt32Field("r_regionkey") // identifier
-      .addStringField("r_name") // fixed text, size 25
-      .addStringField("r_comment") // variable text, size 152
+      .addInt32Field("r_regionkey")
+      .addStringField("r_name")
+      .addStringField("r_comment")
       .build();
+
   public static final Schema SUPPLIER_SCHEMA =
     Schema.builder()
-      .addInt32Field("s_suppkey") // identifier
-      .addStringField("s_name") // fixed text, size 25
-      .addStringField("s_address") // variable text, size 40
-      .addInt32Field("s_nationkey") // identifier
-      .addStringField("s_phone") // fixed text, size 15
-      .addFloatField("s_acctbal") // decimal
-      .addStringField("s_comment") // variable text, size 101
+      .addInt32Field("s_suppkey")
+      .addStringField("s_name")
+      .addStringField("s_address")
+      .addInt32Field("s_nationkey")
+      .addStringField("s_phone")
+      .addFloatField("s_acctbal")
+      .addStringField("s_comment")
       .build();
+
   public static final Schema PART_SCHEMA =
     Schema.builder()
       .addInt32Field("p_partkey")
@@ -254,6 +278,7 @@ public final class Schemas {
       .addFloatField("p_retailprice")
       .addStringField("p_comment")
       .build();
+
   public static final Schema NATION_SCHEMA =
     Schema.builder()
       .addInt32Field("n_nationkey")
@@ -261,93 +286,78 @@ public final class Schemas {
       .addInt32Field("n_regionkey")
       .addStringField("n_comment")
       .build();
-  public static Schema promotionSchema =
+
+  public static final Schema PROMOTION_SCHEMA =
     Schema.builder()
       .addNullableField("p_promo_sk", Schema.FieldType.INT32)
-      .addNullableField("p_promo_id", Schema.FieldType.STRING) //                .string,
-      .addNullableField("p_start_date_sk", Schema.FieldType.INT32) // .int,
-      .addNullableField("p_end_date_sk", Schema.FieldType.INT32) // .int,
-      .addNullableField("p_item_sk", Schema.FieldType.INT32) // .int,
-      .addNullableField("p_cost", Schema.FieldType.FLOAT) // .decimal(15,2),
-      .addNullableField("p_response_target", Schema.FieldType.INT32) // .int,
-      .addNullableField("p_promo_name", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_dmail", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_email", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_catalog", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_tv", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_radio", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_press", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_event", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_demo", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_channel_details", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_purpose", Schema.FieldType.STRING) // .string,
-      .addNullableField("p_discount_active", Schema.FieldType.STRING) // .string),
+      .addNullableField("p_promo_id", Schema.FieldType.STRING)
+      .addNullableField("p_start_date_sk", Schema.FieldType.INT32)
+      .addNullableField("p_end_date_sk", Schema.FieldType.INT32)
+      .addNullableField("p_item_sk", Schema.FieldType.INT32)
+      .addNullableField("p_cost", Schema.FieldType.FLOAT)
+      .addNullableField("p_response_target", Schema.FieldType.INT32)
+      .addNullableField("p_promo_name", Schema.FieldType.STRING)
+      .addNullableField("p_channel_dmail", Schema.FieldType.STRING)
+      .addNullableField("p_channel_email", Schema.FieldType.STRING)
+      .addNullableField("p_channel_catalog", Schema.FieldType.STRING)
+      .addNullableField("p_channel_tv", Schema.FieldType.STRING)
+      .addNullableField("p_channel_radio", Schema.FieldType.STRING)
+      .addNullableField("p_channel_press", Schema.FieldType.STRING)
+      .addNullableField("p_channel_event", Schema.FieldType.STRING)
+      .addNullableField("p_channel_demo", Schema.FieldType.STRING)
+      .addNullableField("p_channel_details", Schema.FieldType.STRING)
+      .addNullableField("p_purpose", Schema.FieldType.STRING)
+      .addNullableField("p_discount_active", Schema.FieldType.STRING)
       .build();
-  public static Schema customerDemographicsSchema =
+
+  public static final Schema CUSTOMER_DEMOGRAPHIC_SCHEMA =
     Schema.builder()
       .addNullableField("cd_demo_sk", Schema.FieldType.INT32)
-      .addNullableField("cd_gender", Schema.FieldType.STRING) //                 .string,
-      .addNullableField("cd_marital_status", Schema.FieldType.STRING) //         .string,
-      .addNullableField("cd_education_status", Schema.FieldType.STRING) //       .string,
-      .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32) //      .int,
-      .addNullableField("cd_credit_rating", Schema.FieldType.STRING) //          .string,
-      .addNullableField("cd_dep_count", Schema.FieldType.INT32) //              .int,
-      .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32) //     .int,
-      .addNullableField("cd_dep_college_count", Schema.FieldType.INT32) //      .int),
+      .addNullableField("cd_gender", Schema.FieldType.STRING)
+      .addNullableField("cd_marital_status", Schema.FieldType.STRING)
+      .addNullableField("cd_education_status", Schema.FieldType.STRING)
+      .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32)
+      .addNullableField("cd_credit_rating", Schema.FieldType.STRING)
+      .addNullableField("cd_dep_count", Schema.FieldType.INT32)
+      .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32)
+      .addNullableField("cd_dep_college_count", Schema.FieldType.INT32)
       .build();
-  public static Schema webSalesSchema =
+
+  public static final Schema WEB_SALES_SCHEMA =
     Schema.builder()
-      .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_item_sk", Schema.FieldType.INT32) //               .int,
-      .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32) //      .int,
-      .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32) //      .int,
-      .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32) //         .int,
-      .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_web_page_sk", Schema.FieldType.INT32) //           .int,
-      .addNullableField("ws_web_site_sk", Schema.FieldType.INT32) //           .int,
-      .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32) //          .int,
-      .addNullableField("ws_promo_sk", Schema.FieldType.INT32) //              .int,
-      .addNullableField("ws_order_number", Schema.FieldType.INT64) //          .long,
-      .addNullableField("ws_quantity", Schema.FieldType.INT32) //              .int,
-      .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT) //        .decimal(7,2),
-      .addNullableField("ws_list_price", Schema.FieldType.FLOAT) //            .decimal(7,2),
-      .addNullableField("ws_sales_price", Schema.FieldType.FLOAT) //           .decimal(7,2),
-      .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT) //      .decimal(7,2),
-      .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT) //       .decimal(7,2),
-      .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT) //    .decimal(7,2),
-      .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT) //        .decimal(7,2),
-      .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT) //               .decimal(7,2),
-      .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT) //            .decimal(7,2),
-      .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT) //         .decimal(7,2),
-      .addNullableField("ws_net_paid", Schema.FieldType.FLOAT) //              .decimal(7,2),
-      .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT) //      .decimal(7,2),
-      .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT) //     .decimal(7,2),
-      .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2),
-      .addNullableField("ws_net_profit", Schema.FieldType.FLOAT) //            .decimal(7,2)),
+      .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_item_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_web_page_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_web_site_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_promo_sk", Schema.FieldType.INT32)
+      .addNullableField("ws_order_number", Schema.FieldType.INT64)
+      .addNullableField("ws_quantity", Schema.FieldType.INT32)
+      .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("ws_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("ws_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT)
+      .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT)
+      .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT)
+      .addNullableField("ws_net_paid", Schema.FieldType.FLOAT)
+      .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT)
+      .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT)
+      .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT)
+      .addNullableField("ws_net_profit", Schema.FieldType.FLOAT)
       .build();
-  public Schemas() {
-    //    final ImmutableMap.Builder<String, TpcdsTable> builder = ImmutableMap.builder();
-    //    for (TpcdsTable<?> tpcdsTable : TpcdsTable.getTables()) {
-    //      builder.put(tpcdsTable.getTableName(), tpcdsTable);
-    //    }
-    //    this.tableHMap = builder.build();
-    this.columnPrefixes =
-      ImmutableMap.<String, String>builder()
-        .put("lineitem", "l_")
-        .put("customer", "c_")
-        .put("supplier", "s_")
-        .put("partsupp", "ps_")
-        .put("part", "p_")
-        .put("orders", "o_")
-        .put("nation", "n_")
-        .put("region", "r_")
-        .build();
-  }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 4ebf0f3..4f45920 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -41,7 +41,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beam
 
 /**
  * A simple SQL application.
- * (Copied/Refined from the example code in the Beam repository)
+ * (Copied and adapted from https://github.com/apache/beam/pull/6240)
  */
 public final class Tpch {
   private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName());
@@ -161,11 +161,14 @@ public final class Tpch {
   private Tpch() {
   }
 
+  /**
+   * Row csv formats.
+   */
   static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
implements Serializable {
 
-    private CSVFormat csvFormat;
+    private final CSVFormat csvFormat;
 
-    public RowToCsv(CSVFormat csvFormat) {
+    RowToCsv(final CSVFormat csvFormat) {
       this.csvFormat = csvFormat;
     }
 
@@ -174,7 +177,7 @@ public final class Tpch {
     }
 
     @Override
-    public PCollection<String> expand(PCollection<Row> input) {
+    public PCollection<String> expand(final PCollection<Row> input) {
       return input.apply(
         "rowToCsv",
         MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat)));
@@ -185,22 +188,24 @@ public final class Tpch {
                                              final CSVFormat csvFormat,
                                              final String inputDirectory) {
     final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder()
-      //.put("customer", Schemas.CUSTOMER_SCHEMA)
       .put("lineitem", Schemas.LINEITEM_SCHEMA)
-      //.put("nation", Schemas.NATION_SCHEMA)
-      //.put("orders", Schemas.ORDER_SCHEMA)
-      //.put("part", Schemas.PART_SCHEMA)
-      //.put("partsupp", Schemas.PARTSUPP_SCHEMA)
-      //.put("region", Schemas.REGION_SCHEMA)
-      //.put("supplier", Schemas.SUPPLIER_SCHEMA)
-      //              .put("store_sales", Schemas.storeSalesSchema)
-      //              .put("catalog_sales", Schemas.catalogSalesSchema)
-      //              .put("item", Schemas.itemSchema)
-      //              .put("date_dim", Schemas.dateDimSchema)
-      //              .put("promotion", Schemas.promotionSchema)
-      //              .put("customer_demographics", Schemas.customerDemographicsSchema)
-      //              .put("web_sales", Schemas.webSalesSchema)
-      //              .put("inventory", Schemas.inventorySchema)
+      /*
+      .put("customer", Schemas.CUSTOMER_SCHEMA)
+      .put("nation", Schemas.NATION_SCHEMA)
+      .put("orders", Schemas.ORDER_SCHEMA)
+      .put("part", Schemas.PART_SCHEMA)
+      .put("partsupp", Schemas.PARTSUPP_SCHEMA)
+      .put("region", Schemas.REGION_SCHEMA)
+      .put("supplier", Schemas.SUPPLIER_SCHEMA)
+      .put("store_sales", Schemas.STORE_SALES_SCHEMA)
+      .put("catalog_sales", Schemas.CATALOG_SALES_SCHEMA)
+      .put("item", Schemas.ITEM_SCHEMA)
+      .put("date_dim", Schemas.DATE_DIM_SCHEMA)
+      .put("promotion", Schemas.PROMOTION_SCHEMA)
+      .put("customer_demographics", Schemas.CUSTOMER_DEMOGRAPHIC_SCHEMA)
+      .put("web_sales", Schemas.WEB_SALES_SCHEMA)
+      .put("inventory", Schemas.INVENTORY_SCHEMA)
+      */
       .build();
 
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java
similarity index 95%
copy from examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
copy to examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java
index 4d55ade..a61ed70 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java
@@ -31,7 +31,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
-public final class BeamSimpleSumSQLITCase {
+public final class SQLSimpleSumITCase {
   private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -61,7 +61,7 @@ public final class BeamSimpleSumSQLITCase {
   @Test (timeout = TIMEOUT)
   public void test() throws Exception {
     JobLauncher.main(builder
-        .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName())
+        .addJobId(SQLSimpleSumITCase.class.getSimpleName())
         .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
similarity index 81%
rename from examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
rename to examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
index 4d55ade..0b7668e 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
@@ -17,8 +17,8 @@ package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
-import org.apache.nemo.common.test.ExampleTestUtil;
 import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.apache.nemo.examples.beam.tpch.Tpch;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,11 +27,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
- * Test Broadcast program with JobLauncher.
+ * Test TPC-H program with JobLauncher.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
-public final class BeamSimpleSumSQLITCase {
+public final class SQLTpchITCase {
   private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -44,25 +44,27 @@ public final class BeamSimpleSumSQLITCase {
   @Before
   public void setUp() throws Exception {
     builder = new ArgBuilder()
-        .addUserMain(SimpleSumSQL.class.getCanonicalName())
-        .addUserArgs(outputFilePath)
         .addResourceJson(executorResourceFileName);
   }
 
   @After
   public void tearDown() throws Exception {
+    /*
     try {
       ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
     } finally {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
+    */
   }
 
   @Test (timeout = TIMEOUT)
-  public void test() throws Exception {
+  public void testSix() throws Exception {
     JobLauncher.main(builder
-        .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName())
-        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
-        .build());
+      .addUserMain(Tpch.class.getCanonicalName())
+      .addUserArgs("6", "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+      .addJobId(SQLTpchITCase.class.getSimpleName())
+      .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
   }
 }


Mime
View raw message