hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nagar...@apache.org
Subject [incubator-hudi] branch master updated: [CLI] Add export to table
Date Fri, 06 Mar 2020 16:53:33 GMT
This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d37818  [CLI] Add export to table
3d37818 is described below

commit 3d3781810c2fd28c85407b04aad08fc2a85174dc
Author: Satish Kotha <satishkotha@uber.com>
AuthorDate: Wed Feb 12 11:23:40 2020 -0800

    [CLI] Add export to table
---
 .../main/java/org/apache/hudi/cli/HoodieCLI.java   |  11 ++
 .../org/apache/hudi/cli/HoodiePrintHelper.java     |  30 +++++
 .../apache/hudi/cli/commands/CommitsCommand.java   |  42 +++++--
 .../apache/hudi/cli/commands/TempViewCommand.java  |  55 +++++++++
 .../hudi/cli/utils/SparkTempViewProvider.java      | 134 +++++++++++++++++++++
 .../apache/hudi/cli/utils/TempViewProvider.java    |  29 +++++
 6 files changed, 288 insertions(+), 13 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
index 561e499..af68035 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.cli;
 
+import org.apache.hudi.cli.utils.SparkTempViewProvider;
+import org.apache.hudi.cli.utils.TempViewProvider;
 import org.apache.hudi.common.model.TimelineLayoutVersion;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ConsistencyGuardConfig;
@@ -43,6 +45,7 @@ public class HoodieCLI {
   protected static HoodieTableMetaClient tableMetadata;
   public static HoodieTableMetaClient syncTableMetadata;
   public static TimelineLayoutVersion layoutVersion;
+  private static TempViewProvider tempViewProvider;
 
   /**
    * Enum for CLI state.
@@ -105,4 +108,12 @@ public class HoodieCLI {
     return tableMetadata;
   }
 
+  public static synchronized TempViewProvider getTempViewProvider() {
+    if (tempViewProvider == null) {
+      tempViewProvider = new SparkTempViewProvider(HoodieCLI.class.getSimpleName());
+    }
+
+    return tempViewProvider;
+  }
+
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index 53114ce..be64037 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -19,12 +19,15 @@
 package org.apache.hudi.cli;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 
 import com.jakewharton.fliptables.FlipTable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 
 /**
@@ -57,11 +60,38 @@ public class HoodiePrintHelper {
    */
   public static String print(TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
       String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]>
rows) {
+    return print(rowHeader, fieldNameToConverterMap, sortByField, isDescending, limit, headerOnly,
rows, "");
+  }
+
+  /**
+   * Serialize Table to printable string and also export a temporary view to easily write
sql queries.
+   *
+   * Ideally, exporting view needs to be outside PrintHelper, but all commands use this.
So this is easy
+   * way to add support for all commands
+   *
+   * @param rowHeader Row Header
+   * @param fieldNameToConverterMap Field Specific Converters
+   * @param sortByField Sorting field
+   * @param isDescending Order
+   * @param limit Limit
+   * @param headerOnly Headers only
+   * @param rows List of rows
+   * @param tempTableName table name to export
+   * @return Serialized form for printing
+   */
+  public static String print(TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
+      String sortByField, boolean isDescending, Integer limit, boolean headerOnly, List<Comparable[]>
rows,
+      String tempTableName) {
 
     if (headerOnly) {
       return HoodiePrintHelper.print(rowHeader);
     }
 
+    if (!StringUtils.isNullOrEmpty(tempTableName)) {
+      HoodieCLI.getTempViewProvider().createOrReplace(tempTableName, rowHeader.getFieldNames(),
+          rows.stream().map(columns -> Arrays.asList(columns)).collect(Collectors.toList()));
+    }
+
     if (!sortByField.isEmpty() && !rowHeader.containsField(sortByField)) {
       return String.format("Field[%s] is not in table, given columns[%s]", sortByField, rowHeader.getFieldNames());
     }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 804096b..3c08305 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.NumericUtils;
-
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.spark.launcher.SparkLauncher;
 import org.springframework.shell.core.CommandMarker;
@@ -59,7 +58,8 @@ public class CommitsCommand implements CommandMarker {
   private String printCommits(HoodieDefaultTimeline timeline,
                               final Integer limit, final String sortByField,
                               final boolean descending,
-                              final boolean headerOnly) throws IOException {
+                              final boolean headerOnly,
+                              final String tempTableName) throws IOException {
     final List<Comparable[]> rows = new ArrayList<>();
 
     final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
@@ -96,13 +96,16 @@ public class CommitsCommand implements CommandMarker {
             .addTableHeaderField("Total Records Written")
             .addTableHeaderField("Total Update Records Written")
             .addTableHeaderField("Total Errors");
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows);
+
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
+            limit, headerOnly, rows, tempTableName);
   }
 
   private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
                               final Integer limit, final String sortByField,
                               final boolean descending,
-                              final boolean headerOnly) throws IOException {
+                              final boolean headerOnly,
+                              final String tempTableName) throws IOException {
     final List<Comparable[]> rows = new ArrayList<>();
 
     final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
@@ -144,13 +147,16 @@ public class CommitsCommand implements CommandMarker {
             .addTableHeaderField("Total Rollback Blocks").addTableHeaderField("Total Log
Records")
             .addTableHeaderField("Total Updated Records Compacted").addTableHeaderField("Total
Write Bytes");
 
-    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
+            limit, headerOnly, rows, tempTableName);
   }
 
   @CliCommand(value = "commits show", help = "Show the commits")
   public String showCommits(
       @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
           unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output
table",
+          unspecifiedDefaultValue = "") final String exportTableName,
       @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
@@ -161,9 +167,9 @@ public class CommitsCommand implements CommandMarker {
 
     HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
     if (includeExtraMetadata) {
-      return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly);
+      return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly,
exportTableName);
     } else  {
-      return printCommits(activeTimeline, limit, sortByField, descending, headerOnly);
+      return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName);
     }
   }
 
@@ -171,6 +177,8 @@ public class CommitsCommand implements CommandMarker {
   public String showArchivedCommits(
           @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
                   unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
+          @CliOption(key = {"createView"}, mandatory = false, help = "view name to store
output table",
+                  unspecifiedDefaultValue = "") final String exportTableName,
           @CliOption(key = {"startTs"},  mandatory = false, help = "start time for commits,
default: now - 10 days")
           String startTs,
           @CliOption(key = {"endTs"},  mandatory = false, help = "end time for commits, default:
now - 1 day")
@@ -195,9 +203,9 @@ public class CommitsCommand implements CommandMarker {
       archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
       HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs,
endTs);
       if (includeExtraMetadata) {
-        return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly);
+        return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly,
exportTableName);
       } else  {
-        return printCommits(timelineRange, limit, sortByField, descending, headerOnly);
+        return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName);
       }
     } finally {
       // clear the instant details from memory after printing to reduce usage
@@ -237,7 +245,10 @@ public class CommitsCommand implements CommandMarker {
   }
 
   @CliCommand(value = "commit showpartitions", help = "Show partition level details of a
commit")
-  public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit to show")
final String commitTime,
+  public String showCommitPartitions(
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output
table",
+          unspecifiedDefaultValue = "") final String exportTableName,
+      @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
       @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1")
final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final
boolean descending,
@@ -287,11 +298,15 @@ public class CommitsCommand implements CommandMarker {
         .addTableHeaderField("Total Records Inserted").addTableHeaderField("Total Records
Updated")
         .addTableHeaderField("Total Bytes Written").addTableHeaderField("Total Errors");
 
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
+        limit, headerOnly, rows, exportTableName);
   }
 
   @CliCommand(value = "commit showfiles", help = "Show file level details of a commit")
-  public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to show") final
String commitTime,
+  public String showCommitFiles(
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output
table",
+          unspecifiedDefaultValue = "") final String exportTableName,
+      @CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
       @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1")
final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final
boolean descending,
@@ -323,7 +338,8 @@ public class CommitsCommand implements CommandMarker {
         .addTableHeaderField("Total Records Written").addTableHeaderField("Total Bytes Written")
         .addTableHeaderField("Total Errors").addTableHeaderField("File Size");
 
-    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
+        limit, headerOnly, rows, exportTableName);
   }
 
   @CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table")
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
new file mode 100644
index 0000000..39e3767
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * CLI command to query/delete temp views.
+ */
+@Component
+public class TempViewCommand implements CommandMarker {
+
+  private static final String EMPTY_STRING = "";
+
+  @CliCommand(value = "temp_query", help = "query against created temp view")
+  public String query(
+          @CliOption(key = {"sql"}, mandatory = true, help = "select query to run against
view") final String sql)
+          throws IOException {
+
+    HoodieCLI.getTempViewProvider().runQuery(sql);
+    return EMPTY_STRING;
+  }
+
+  @CliCommand(value = "temp_delete", help = "Delete view name")
+  public String delete(
+          @CliOption(key = {"view"}, mandatory = true, help = "view name") final String tableName)
+          throws IOException {
+
+    HoodieCLI.getTempViewProvider().deleteTable(tableName);
+    return EMPTY_STRING;
+  }
+}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
new file mode 100644
index 0000000..68c18f9
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkTempViewProvider implements TempViewProvider {
+  private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class);
+
+  private JavaSparkContext jsc;
+  private SQLContext sqlContext;
+
+  public SparkTempViewProvider(String appName) {
+    try {
+      SparkConf sparkConf = new SparkConf().setAppName(appName)
+              .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+      jsc = new JavaSparkContext(sparkConf);
+      jsc.setLogLevel("ERROR");
+
+      sqlContext = new SQLContext(jsc);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug failures,
if any
+      LOG.error("unable to initialize spark context ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void createOrReplace(String tableName, List<String> headers, List<List<Comparable>>
rows) {
+    try {
+      if (headers.isEmpty() || rows.isEmpty()) {
+        return;
+      }
+
+      if (rows.stream().filter(row -> row.size() != headers.size()).count() > 0) {
+        throw new HoodieException("Invalid row, does not match headers " + headers.size()
+ " " + rows.size());
+      }
+
+      // replace all whitespaces in headers to make it easy to write sql queries
+      List<String> headersNoSpaces = headers.stream().map(title -> title.replaceAll("\\s+",""))
+              .collect(Collectors.toList());
+
+      // generate schema for table
+      StructType structType = new StructType();
+      for (int i = 0; i < headersNoSpaces.size(); i++) {
+        // try guessing data type from column data.
+        DataType headerDataType = getDataType(rows.get(0).get(i));
+        structType = structType.add(DataTypes.createStructField(headersNoSpaces.get(i), headerDataType,
true));
+      }
+      List<Row> records = rows.stream().map(row -> RowFactory.create(row.toArray(new
Comparable[row.size()])))
+              .collect(Collectors.toList());
+      Dataset<Row> dataset = this.sqlContext.createDataFrame(records, structType);
+      dataset.createOrReplaceTempView(tableName);
+      System.out.println("Wrote table view: " + tableName);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug failures,
if any
+      LOG.error("unable to write ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void runQuery(String sqlText) {
+    try {
+      this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug failures,
if any
+      LOG.error("unable to read ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void deleteTable(String tableName) {
+    try {
+      sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug failures,
if any
+      LOG.error("unable to initialize spark context ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  private DataType getDataType(Comparable comparable) {
+    if (comparable instanceof Integer) {
+      return DataTypes.IntegerType;
+    }
+
+    if (comparable instanceof Double) {
+      return DataTypes.DoubleType;
+    }
+
+    if (comparable instanceof Long) {
+      return DataTypes.LongType;
+    }
+
+    if (comparable instanceof Boolean) {
+      return DataTypes.BooleanType;
+    }
+
+    // TODO add additional types when needed. default to string
+    return DataTypes.StringType;
+  }
+}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
new file mode 100644
index 0000000..1075fdd
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import java.util.List;
+
+public interface TempViewProvider {
+  void createOrReplace(String tableName, List<String> headers, List<List<Comparable>>
rows);
+
+  void runQuery(String sqlText);
+
+  void deleteTable(String tableName);
+}


Mime
View raw message