http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
new file mode 100644
index 0000000..6ded8fa
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
+
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+/**
+ * Utility class that contains all strings for CLI commands and messages.
+ */
+public final class CliStrings {
+
+ private CliStrings() {
+ // private
+ }
+
+ public static final String CLI_NAME = "Flink SQL CLI Client";
+ public static final String DEFAULT_MARGIN = " ";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client."))
+ .append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal."))
+ .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
+ .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
+ .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
+ .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
+ .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+ .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
+ .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
+ .append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(": Use '\\' for multi-line commands.")
+ .toAttributedString();
+
+ public static final String MESSAGE_WELCOME;
+ // make findbugs happy
+ static {
+ MESSAGE_WELCOME = " \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592\n" +
+ " \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592\n" +
+ " \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592\n" +
+ " \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588\n" +
+ " \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592\n" +
+ " \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588\n" +
+ " \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588\n" +
+ " \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592\n" +
+ " \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592\n" +
+ " \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591\n" +
+ " \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592\n" +
+ " \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592\n" +
+ " \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592\n" +
+ " \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591\n" +
+ " \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593\n" +
+ " \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592\n" +
+ " \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592\n" +
+ " \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588\n" +
+ " \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588\n" +
+ " \u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593\n" +
+ " \u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593\n" +
+ " \u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593\n" +
+ " \u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592\n" +
+ " \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592\n" +
+ " \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593\n" +
+ " \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591\n" +
+ " \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588\n" +
+ " \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593\n" +
+ " \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588\n" +
+ " \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593\n" +
+ " \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592\n" +
+ " \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591\n" +
+ " \n" +
+ " ______ _ _ _ _____ ____ _ _____ _ _ _ BETA \n" +
+ " | ____| (_) | | / ____|/ __ \\| | / ____| (_) | | \n" +
+ " | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ \n" +
+ " | __| | | | '_ \\| |/ / \\___ \\| | | | | | | | | |/ _ \\ '_ \\| __|\n" +
+ " | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ \n" +
+ " |_| |_|_|_| |_|_|\\_\\ |_____/ \\___\\_\\______| \\_____|_|_|\\___|_| |_|\\__|\n" +
+ " \n" +
+ " Welcome! Enter HELP to list all available commands. QUIT to exit.\n\n";
+ }
+
+ public static final String MESSAGE_QUIT = "Exiting " + CliStrings.CLI_NAME + "...";
+
+ public static final String MESSAGE_SQL_EXECUTION_ERROR = "Could not execute SQL statement.";
+
+ public static final String MESSAGE_RESET = "All session properties have been set to their default values.";
+
+ public static final String MESSAGE_SET = "Session property has been set.";
+
+ public static final String MESSAGE_EMPTY = "Result was empty.";
+
+ public static final String MESSAGE_UNKNOWN_SQL = "Unknown SQL statement.";
+
+ public static final String MESSAGE_UNKNOWN_TABLE = "Unknown table.";
+
+ public static final String MESSAGE_RESULT_SNAPSHOT_ERROR = "Could not create a snapshot of the dynamic table.";
+
+ public static final String MESSAGE_RESULT_QUIT = "Result retrieval cancelled.";
+
+ public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
+
+ public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
+
+ public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final String RESULT_TITLE = "SQL Query Result";
+
+ public static final String RESULT_REFRESH_INTERVAL = "Refresh:";
+
+ public static final String RESULT_PAGE = "Page:";
+
+ public static final String RESULT_PAGE_OF = " of ";
+
+ public static final String RESULT_LAST_REFRESH = "Updated:";
+
+ public static final String RESULT_LAST_PAGE = "Last";
+
+ public static final String RESULT_QUIT = "Quit";
+
+ public static final String RESULT_REFRESH = "Refresh";
+
+ public static final String RESULT_GOTO = "Goto Page";
+
+ public static final String RESULT_NEXT = "Next Page";
+
+ public static final String RESULT_PREV = "Prev Page";
+
+ public static final String RESULT_LAST = "Last Page";
+
+ public static final String RESULT_FIRST = "First Page";
+
+ public static final String RESULT_SEARCH = "Search";
+
+ public static final String RESULT_INC_REFRESH = "Inc Refresh"; // implementation assumes max length of 11
+
+ public static final String RESULT_DEC_REFRESH = "Dec Refresh";
+
+ public static final String RESULT_OPEN = "Open Row";
+
+ public static final String RESULT_CHANGELOG = "Changelog";
+
+ public static final String RESULT_TABLE = "Table";
+
+ public static final String RESULT_STOPPED = "Table program finished.";
+
+ public static final String RESULT_REFRESH_UNKNOWN = "Unknown";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final String INPUT_TITLE = "Input Dialog";
+
+ public static final AttributedString INPUT_HELP = new AttributedStringBuilder()
+ .append("Press ")
+ .style(AttributedStyle.DEFAULT.inverse())
+ .append("Enter")
+ .style(AttributedStyle.DEFAULT)
+ .append(" to submit. Press ")
+ .style(AttributedStyle.DEFAULT.inverse())
+ .append("ESC")
+ .style(AttributedStyle.DEFAULT)
+ .append(" or submit an empty string to cancel.")
+ .toAttributedString();
+
+ public static final String INPUT_ENTER_PAGE = "Enter page number:";
+
+ public static final String INPUT_ERROR = "The input is invalid please check it again.";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final AttributedString ROW_QUIT = new AttributedStringBuilder()
+ .append("Press ")
+ .style(AttributedStyle.DEFAULT.inverse())
+ .append("Q")
+ .style(AttributedStyle.DEFAULT)
+ .append(" to go back.")
+ .toAttributedString();
+
+ public static final String ROW_HEADER = "Row Summary";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static AttributedString messageInfo(String message) {
+ return new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.bold().foreground(AttributedStyle.BLUE))
+ .append("[INFO] ")
+ .append(message)
+ .toAttributedString();
+ }
+
+ public static AttributedString messageError(String message, Throwable t) {
+ while (t.getCause() != null && t.getCause().getMessage() != null && !t.getCause().getMessage().isEmpty()) {
+ t = t.getCause();
+ }
+ return messageError(message, t.getClass().getName() + ": " + t.getMessage());
+ // return messageError(message, ExceptionUtils.stringifyException(t));
+ }
+
+ public static AttributedString messageError(String message) {
+ return messageError(message, (String) null);
+ }
+
+ public static AttributedString messageError(String message, String s) {
+ final AttributedStringBuilder builder = new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.bold().foreground(AttributedStyle.RED))
+ .append("[ERROR] ")
+ .append(message);
+
+ if (s != null) {
+ builder
+ .append(" Reason:\n")
+ .append(s);
+ }
+
+ return builder.toAttributedString();
+ }
+
+ private static AttributedString formatCommand(SqlCommand cmd, String description) {
+ return new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(cmd.toString())
+ .append("\t\t")
+ .style(AttributedStyle.DEFAULT)
+ .append(description)
+ .append('\n')
+ .toAttributedString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
new file mode 100644
index 0000000..4d9c69b
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.jline.keymap.KeyMap;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp.Capability;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
+import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
+import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
+import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
+import static org.jline.keymap.KeyMap.ctrl;
+import static org.jline.keymap.KeyMap.esc;
+import static org.jline.keymap.KeyMap.key;
+
+/**
+ * CLI view for retrieving and displaying a table.
+ */
+public class CliTableResultView extends CliResultView<CliTableResultView.ResultTableOperation> {
+
+ private int pageCount;
+ private int page;
+ private LocalTime lastRetrieval;
+
+ private static final int DEFAULT_REFRESH_INTERVAL = 3; // every 1s
+ private static final int MIN_REFRESH_INTERVAL = 1; // every 100ms
+ private static final int LAST_PAGE = 0;
+
+ public CliTableResultView(CliClient client, ResultDescriptor resultDescriptor) {
+ super(client, resultDescriptor);
+
+ refreshInterval = DEFAULT_REFRESH_INTERVAL;
+ pageCount = 1;
+ page = LAST_PAGE;
+
+ previousResults = Collections.emptyList();
+ results = Collections.emptyList();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ protected String[] getRow(String[] resultRow) {
+ return resultRow;
+ }
+
+ @Override
+ protected int computeColumnWidth(int idx) {
+ return MAX_COLUMN_WIDTH;
+ }
+
+ @Override
+ protected void refresh() {
+ // take snapshot
+ TypedResult<Integer> result;
+ try {
+ result = client.getExecutor().snapshotResult(client.getContext(), resultDescriptor.getResultId(), getVisibleMainHeight());
+ } catch (SqlExecutionException e) {
+ close(e);
+ return;
+ }
+
+ // stop retrieval if job is done
+ if (result.getType() == TypedResult.ResultType.EOS) {
+ stopRetrieval();
+ }
+ // update page
+ else if (result.getType() == TypedResult.ResultType.PAYLOAD) {
+ int newPageCount = result.getPayload();
+ pageCount = newPageCount;
+ if (page > newPageCount) {
+ page = LAST_PAGE;
+ }
+ updatePage();
+ }
+
+ lastRetrieval = LocalTime.now();
+
+ // reset view
+ resetAllParts();
+ }
+
+ @Override
+ protected KeyMap<ResultTableOperation> getKeys() {
+ final KeyMap<ResultTableOperation> keys = new KeyMap<>();
+ keys.setAmbiguousTimeout(200); // make ESC quicker
+ keys.bind(ResultTableOperation.QUIT, "q", "Q", esc(), ctrl('c'));
+ keys.bind(ResultTableOperation.REFRESH, "r", "R", key(client.getTerminal(), Capability.key_f5));
+ keys.bind(ResultTableOperation.UP, "w", "W", key(client.getTerminal(), Capability.key_up));
+ keys.bind(ResultTableOperation.DOWN, "s", "S", key(client.getTerminal(), Capability.key_down));
+ keys.bind(ResultTableOperation.LEFT, "a", "A", key(client.getTerminal(), Capability.key_left));
+ keys.bind(ResultTableOperation.RIGHT, "d", "D", key(client.getTerminal(), Capability.key_right));
+ keys.bind(ResultTableOperation.OPEN, "o", "O", "\r");
+ keys.bind(ResultTableOperation.GOTO, "g", "G");
+ keys.bind(ResultTableOperation.NEXT, "n", "N");
+ keys.bind(ResultTableOperation.PREV, "p", "P");
+ keys.bind(ResultTableOperation.LAST, "l", "L", key(client.getTerminal(), Capability.key_end));
+ keys.bind(ResultTableOperation.INC_REFRESH, "+");
+ keys.bind(ResultTableOperation.DEC_REFRESH, "-");
+ return keys;
+ }
+
+ @Override
+ protected void evaluate(ResultTableOperation operation, String binding) {
+ switch (operation) {
+ case QUIT:
+ close();
+ break;
+ case REFRESH:
+ refresh();
+ break;
+ case UP:
+ selectRowUp();
+ break;
+ case DOWN:
+ selectRowDown();
+ break;
+ case OPEN:
+ openRow();
+ break;
+ case GOTO:
+ gotoPage();
+ break;
+ case NEXT:
+ gotoNextPage();
+ break;
+ case PREV:
+ gotoPreviousPage();
+ break;
+ case LAST:
+ gotoLastPage();
+ break;
+ case LEFT:
+ scrollLeft();
+ break;
+ case RIGHT:
+ scrollRight();
+ break;
+ case INC_REFRESH:
+ increaseRefreshInterval();
+ break;
+ case DEC_REFRESH:
+ decreaseRefreshInterval(MIN_REFRESH_INTERVAL);
+ break;
+ }
+ }
+
+ @Override
+ protected String getTitle() {
+ return CliStrings.RESULT_TITLE + " (" + CliStrings.RESULT_TABLE + ")";
+ }
+
+ @Override
+ protected List<AttributedString> computeHeaderLines() {
+ final AttributedStringBuilder statusLine = new AttributedStringBuilder();
+ statusLine.style(AttributedStyle.INVERSE);
+ // left
+ final String left;
+ if (isRetrieving()) {
+ left = CliStrings.DEFAULT_MARGIN + CliStrings.RESULT_REFRESH_INTERVAL + ' ' + REFRESH_INTERVALS.get(refreshInterval).f0;
+ } else {
+ left = CliStrings.DEFAULT_MARGIN + CliStrings.RESULT_STOPPED;
+ }
+ // middle
+ final StringBuilder middleBuilder = new StringBuilder();
+ middleBuilder.append(CliStrings.RESULT_PAGE);
+ middleBuilder.append(' ');
+ if (page == LAST_PAGE) {
+ middleBuilder.append(CliStrings.RESULT_LAST_PAGE);
+ } else {
+ middleBuilder.append(page);
+ }
+ middleBuilder.append(CliStrings.RESULT_PAGE_OF);
+ middleBuilder.append(pageCount);
+ final String middle = middleBuilder.toString();
+ // right
+ final String right;
+ if (lastRetrieval == null) {
+ right = CliStrings.RESULT_LAST_REFRESH + ' ' + CliStrings.RESULT_REFRESH_UNKNOWN + CliStrings.DEFAULT_MARGIN;
+ } else {
+ right = CliStrings.RESULT_LAST_REFRESH + ' ' + lastRetrieval.format(TIME_FORMATTER) + CliStrings.DEFAULT_MARGIN;
+ }
+ // all together
+ final int totalLeftSpace = getWidth() - middle.length();
+ final int leftSpace = totalLeftSpace / 2 - left.length();
+ statusLine.append(left);
+ repeatChar(statusLine, ' ', leftSpace);
+ statusLine.append(middle);
+ final int rightSpacing = getWidth() - statusLine.length() - right.length();
+ repeatChar(statusLine, ' ', rightSpacing);
+ statusLine.append(right);
+
+ return Arrays.asList(statusLine.toAttributedString(), AttributedString.EMPTY);
+ }
+
+ @Override
+ protected List<AttributedString> computeMainHeaderLines() {
+ final AttributedStringBuilder schemaHeader = new AttributedStringBuilder();
+
+ Arrays.stream(resultDescriptor.getResultSchema().getColumnNames()).forEach(s -> {
+ schemaHeader.append(' ');
+ schemaHeader.style(AttributedStyle.DEFAULT.underline());
+ normalizeColumn(schemaHeader, s, MAX_COLUMN_WIDTH);
+ schemaHeader.style(AttributedStyle.DEFAULT);
+ });
+
+ return Collections.singletonList(schemaHeader.toAttributedString());
+ }
+
+ @Override
+ protected List<AttributedString> computeFooterLines() {
+ return formatTwoLineHelpOptions(getWidth(), getHelpOptions());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void updatePage() {
+ // retrieve page
+ final int retrievalPage = page == LAST_PAGE ? pageCount : page;
+ final List<Row> rows;
+ try {
+ rows = client.getExecutor().retrieveResultPage(resultDescriptor.getResultId(), retrievalPage);
+ } catch (SqlExecutionException e) {
+ close(e);
+ return;
+ }
+
+ // convert page
+ final List<String[]> stringRows = rows
+ .stream()
+ .map(CliUtils::rowToString)
+ .collect(Collectors.toList());
+
+ // update results
+ previousResults = results;
+ results = stringRows;
+
+ // check if selected row is still valid
+ if (selectedRow != NO_ROW_SELECTED) {
+ if (selectedRow >= results.size()) {
+ selectedRow = NO_ROW_SELECTED;
+ }
+ }
+
+ // reset view
+ resetAllParts();
+ }
+
+ private List<Tuple2<String, String>> getHelpOptions() {
+ final List<Tuple2<String, String>> options = new ArrayList<>();
+
+ options.add(Tuple2.of("Q", CliStrings.RESULT_QUIT));
+ options.add(Tuple2.of("R", CliStrings.RESULT_REFRESH));
+
+ options.add(Tuple2.of("+", CliStrings.RESULT_INC_REFRESH));
+ options.add(Tuple2.of("-", CliStrings.RESULT_DEC_REFRESH));
+
+ options.add(Tuple2.of("G", CliStrings.RESULT_GOTO));
+ options.add(Tuple2.of("L", CliStrings.RESULT_LAST));
+
+ options.add(Tuple2.of("N", CliStrings.RESULT_NEXT));
+ options.add(Tuple2.of("P", CliStrings.RESULT_PREV));
+
+ options.add(Tuple2.of("O", CliStrings.RESULT_OPEN));
+
+ return options;
+ }
+
+ private void gotoPage() {
+ final CliInputView view = new CliInputView(
+ client,
+ CliStrings.INPUT_ENTER_PAGE,
+ (s) -> {
+ // validate input
+ final int newPage;
+ try {
+ newPage = Integer.parseInt(s);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return newPage > 0 && newPage <= pageCount;
+ });
+ view.open(); // enter view
+ if (view.getResult() != null) {
+ page = Integer.parseInt(view.getResult());
+ updatePage();
+ }
+ }
+
+ private void gotoNextPage() {
+ final int curPageIndex = page == LAST_PAGE ? pageCount : page;
+ if (curPageIndex < pageCount) {
+ page = curPageIndex + 1;
+ }
+ updatePage();
+ }
+
+ private void gotoPreviousPage() {
+ final int curPageIndex = page == LAST_PAGE ? pageCount : page;
+ if (curPageIndex > 1) {
+ page = curPageIndex - 1;
+ }
+ updatePage();
+ }
+
+ private void gotoLastPage() {
+ page = LAST_PAGE;
+ updatePage();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Available operations for this view.
+ */
+ public enum ResultTableOperation {
+ QUIT, // leave view
+ REFRESH, // refresh current table page
+ UP, // row selection up
+ DOWN, // row selection down
+ OPEN, // shows a full row
+ GOTO, // enter table page number
+ NEXT, // next table page
+ PREV, // previous table page
+ LAST, // last table page
+ LEFT, // scroll left if row is large
+ RIGHT, // scroll right if row is large
+ INC_REFRESH, // increase refresh rate
+ DEC_REFRESH, // decrease refresh rate
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
new file mode 100644
index 0000000..d0adaa1
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * Utilities for CLI formatting.
+ */
+public final class CliUtils {
+
+ public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
+
+ private CliUtils() {
+ // private
+ }
+
+ public static void repeatChar(AttributedStringBuilder sb, char c, int count) {
+ IntStream.range(0, count).forEach(i -> sb.append(c));
+ }
+
+ public static void normalizeColumn(AttributedStringBuilder sb, String col, int maxWidth) {
+ // limit column content
+ if (col.length() > maxWidth) {
+ sb.append(col, 0, maxWidth - 1);
+ sb.append('~');
+ } else {
+ repeatChar(sb, ' ', maxWidth - col.length());
+ sb.append(col);
+ }
+ }
+
+ public static List<AttributedString> formatTwoLineHelpOptions(int width, List<Tuple2<String, String>> options) {
+ final AttributedStringBuilder line1 = new AttributedStringBuilder();
+ final AttributedStringBuilder line2 = new AttributedStringBuilder();
+
+ // we assume that every options has not more than 11 characters (+ key and space)
+ final int columns = (int) Math.ceil(((double) options.size()) / 2);
+ final int space = (width - CliStrings.DEFAULT_MARGIN.length() - columns * 13) / columns;
+ final Iterator<Tuple2<String, String>> iter = options.iterator();
+ while (iter.hasNext()) {
+ // first line
+ Tuple2<String, String> option = iter.next();
+ line1.style(AttributedStyle.DEFAULT.inverse());
+ line1.append(option.f0);
+ line1.style(AttributedStyle.DEFAULT);
+ line1.append(' ');
+ line1.append(option.f1);
+ repeatChar(line1, ' ', (11 - option.f1.length()) + space);
+ // second line
+ if (iter.hasNext()) {
+ option = iter.next();
+ line2.style(AttributedStyle.DEFAULT.inverse());
+ line2.append(option.f0);
+ line2.style(AttributedStyle.DEFAULT);
+ line2.append(' ');
+ line2.append(option.f1);
+ repeatChar(line2, ' ', (11 - option.f1.length()) + space);
+ }
+ }
+
+ return Arrays.asList(line1.toAttributedString(), line2.toAttributedString());
+ }
+
+ public static String[] rowToString(Row row) {
+ final String[] fields = new String[row.getArity()];
+ for (int i = 0; i < row.getArity(); i++) {
+ fields[i] = row.getField(i).toString();
+ }
+ return fields;
+ }
+
+ public static String[] typesToString(TypeInformation<?>[] types) {
+ final String[] typesAsString = new String[types.length];
+ for (int i = 0; i < types.length; i++) {
+ typesAsString[i] = types[i].toString();
+ }
+ return typesAsString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
new file mode 100644
index 0000000..3fa1aab
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Attributes.LocalFlag;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.Terminal.Signal;
+import org.jline.terminal.Terminal.SignalHandler;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp.Capability;
+
+import java.io.IOError;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
+
+/**
+ * Framework for a CLI view with header, footer, and main part that is scrollable.
+ *
+ * @param <OP> supported list of operations
+ */
+public abstract class CliView<OP extends Enum<OP>, OUT> {
+
+ protected final CliClient client;
+
+ protected int offsetX;
+
+ protected int offsetY;
+
+ private boolean isRunning;
+
+ private Thread inputThread;
+
+ private int width;
+
+ private int height;
+
+ private final BindingReader keyReader;
+
+ private AttributedString titleLine;
+
+ private List<AttributedString> headerLines;
+
+ private List<AttributedString> mainHeaderLines; // vertically scrollable
+
+ private List<AttributedString> mainLines;
+
+ private List<AttributedString> footerLines;
+
+ private int totalMainWidth;
+
+ private SqlExecutionException executionException;
+
+ private OUT result;
+
+ public CliView(CliClient client) {
+ this.client = client;
+
+ keyReader = new BindingReader(client.getTerminal().reader());
+ }
+
+ public void open() {
+ isRunning = true;
+ inputThread = Thread.currentThread();
+
+ // prepare terminal
+ final Tuple2<Attributes, Map<Signal, SignalHandler>> prev = prepareTerminal();
+ ensureTerminalFullScreen();
+ updateSize();
+
+ init();
+
+ synchronized (this) {
+ display();
+ }
+
+ final KeyMap<OP> keys = getKeys();
+
+ while (isRunning) {
+
+ final OP operation;
+ try {
+ operation = keyReader.readBinding(keys, null, true);
+ } catch (IOError e) {
+ break;
+ }
+
+ // refresh loop
+ if (operation == null) {
+ continue;
+ }
+
+ synchronized (this) {
+ try {
+ evaluate(operation, keyReader.getLastBinding());
+ } catch (SqlExecutionException e) {
+ // in case the evaluate method did not use the close method
+ close(e);
+ }
+
+ if (isRunning) {
+ // ensure full-screen again in case a sub-view has been opened in evaluate
+ ensureTerminalFullScreen();
+
+ display();
+ }
+ }
+ }
+
+ cleanUp();
+
+ // clean terminal
+ restoreTerminal(prev);
+ unsetTerminalFullScreen();
+
+ if (executionException != null) {
+ throw executionException;
+ }
+ }
+
+ public OUT getResult() {
+ return result;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected boolean isRunning() {
+ return isRunning;
+ }
+
+ protected void close() {
+ if (isRunning) {
+ isRunning = false;
+ // break the input loop if this method is called from another thread
+ if (Thread.currentThread() != inputThread) {
+ inputThread.interrupt();
+ }
+ }
+ }
+
+ protected void close(SqlExecutionException e) {
+ executionException = e;
+ close();
+ }
+
+ protected void close(OUT result) {
+ this.result = result;
+ isRunning = false;
+ }
+
+ protected void display() {
+ // cache
+ final List<AttributedString> headerLines = getHeaderLines();
+ final List<AttributedString> mainHeaderLines = getMainHeaderLines();
+ final List<AttributedString> mainLines = getMainLines();
+ final List<AttributedString> footerLines = getFooterLines();
+ final int visibleMainHeight = getVisibleMainHeight();
+ final int totalMainWidth = getTotalMainWidth();
+
+ // create output
+ client.clearTerminal();
+
+ final List<String> lines = new ArrayList<>();
+
+ // title part
+ client.getTerminal().writer().println(computeTitleLine().toAnsi());
+
+ // header part
+ headerLines.forEach(l -> client.getTerminal().writer().println(l.toAnsi()));
+
+ // main part
+ // update vertical offset
+ if (visibleMainHeight > mainLines.size()) {
+ offsetY = 0; // enough space
+ } else {
+ offsetY = Math.min(mainLines.size() - visibleMainHeight, offsetY); // bound offset
+ }
+ // update horizontal offset
+ if (width > totalMainWidth) {
+ offsetX = 0; // enough space
+ } else {
+ offsetX = Math.min(totalMainWidth - width, offsetX); // bound offset
+ }
+ // create window
+ final List<AttributedString> windowedMainLines =
+ mainLines.subList(offsetY, Math.min(mainLines.size(), offsetY + visibleMainHeight));
+ // print window
+ Stream.concat(mainHeaderLines.stream(), windowedMainLines.stream()).forEach(l -> {
+ if (offsetX < l.length()) {
+ final AttributedString windowX = l.substring(offsetX, Math.min(l.length(), offsetX + width));
+ client.getTerminal().writer().println(windowX.toAnsi());
+ } else {
+ client.getTerminal().writer().println(); // nothing to show for this line
+ }
+ });
+
+ // footer part
+ final int emptyHeight = height - 1 - headerLines.size() - // -1 = title
+ windowedMainLines.size() - mainHeaderLines.size() - footerLines.size();
+ // padding
+ IntStream.range(0, emptyHeight).forEach(i -> client.getTerminal().writer().println());
+ // footer
+ IntStream.range(0, footerLines.size()).forEach((i) -> {
+ final AttributedString l = footerLines.get(i);
+ if (i == footerLines.size() - 1) {
+ client.getTerminal().writer().print(l.toAnsi());
+ } else {
+ client.getTerminal().writer().println(l.toAnsi());
+ }
+ });
+
+ client.getTerminal().flush();
+ }
+
+ protected void scrollLeft() {
+ if (offsetX > 0) {
+ offsetX -= 1;
+ }
+ }
+
+ protected void scrollRight() {
+ final int maxOffset = Math.max(0, getTotalMainWidth() - width);
+ if (offsetX < maxOffset) {
+ offsetX += 1;
+ }
+ }
+
+ protected void scrollUp() {
+ if (offsetY > 0) {
+ offsetY -= 1;
+ }
+ }
+
+ protected void scrollDown() {
+ scrollDown(1);
+ }
+
+ protected void scrollDown(int n) {
+ final int maxOffset = Math.max(0, getMainLines().size() - getVisibleMainHeight());
+ offsetY = Math.min(maxOffset, offsetY + n);
+ }
+
+ protected int getVisibleMainHeight() {
+ // -1 = title line
+ return height - 1 - getHeaderLines().size() - getMainHeaderLines().size() -
+ getFooterLines().size();
+ }
+
+ protected List<AttributedString> getHeaderLines() {
+ if (headerLines == null) {
+ headerLines = computeHeaderLines();
+ }
+ return headerLines;
+ }
+
+ protected List<AttributedString> getMainHeaderLines() {
+ if (mainHeaderLines == null) {
+ mainHeaderLines = computeMainHeaderLines();
+ totalMainWidth = computeTotalMainWidth();
+ }
+ return mainHeaderLines;
+ }
+
+ protected List<AttributedString> getMainLines() {
+ if (mainLines == null) {
+ mainLines = computeMainLines();
+ totalMainWidth = computeTotalMainWidth();
+ }
+ return mainLines;
+ }
+
+ protected List<AttributedString> getFooterLines() {
+ if (footerLines == null) {
+ footerLines = computeFooterLines();
+ }
+ return footerLines;
+ }
+
+ protected int getTotalMainWidth() {
+ if (totalMainWidth <= 0) {
+ totalMainWidth = computeTotalMainWidth();
+ }
+ return totalMainWidth;
+ }
+
+ protected AttributedString getTitleLine() {
+ if (titleLine == null) {
+ titleLine = computeTitleLine();
+ }
+ return titleLine;
+ }
+
+ /**
+ * Must be called when values in one or more parts have changed.
+ */
+ protected void resetAllParts() {
+ titleLine = null;
+ headerLines = null;
+ mainHeaderLines = null;
+ mainLines = null;
+ footerLines = null;
+ totalMainWidth = 0;
+ }
+
+ /**
+ * Must be called when values in the main part (main header or main) have changed.
+ */
+ protected void resetMainPart() {
+ mainHeaderLines = null;
+ mainLines = null;
+ totalMainWidth = 0;
+ }
+
+ protected int getWidth() {
+ return width;
+ }
+
+ protected int getHeight() {
+ return height;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void updateSize() {
+ width = client.getWidth();
+ height = client.getHeight();
+ totalMainWidth = width;
+ resetAllParts();
+ }
+
+ private void ensureTerminalFullScreen() {
+ final Terminal terminal = client.getTerminal();
+ terminal.puts(Capability.enter_ca_mode);
+ terminal.puts(Capability.keypad_xmit);
+ terminal.puts(Capability.cursor_invisible);
+ }
+
+ private Tuple2<Attributes, Map<Signal, SignalHandler>> prepareTerminal() {
+ final Terminal terminal = client.getTerminal();
+
+ final Attributes prevAttributes = terminal.getAttributes();
+ // adopted from org.jline.builtins.Nano
+ // see also https://en.wikibooks.org/wiki/Serial_Programming/termios#Basic_Configuration_of_a_Serial_Interface
+
+ // no line processing
+ // canonical mode off, echo off, echo newline off, extended input processing off
+ Attributes newAttr = new Attributes(prevAttributes);
+ newAttr.setLocalFlags(EnumSet.of(LocalFlag.ICANON, LocalFlag.ECHO, LocalFlag.IEXTEN), false);
+ // turn off input processing
+ newAttr.setInputFlags(EnumSet.of(Attributes.InputFlag.IXON, Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR), false);
+ // one input byte is enough to return from read, inter-character timer off
+ newAttr.setControlChar(Attributes.ControlChar.VMIN, 1);
+ newAttr.setControlChar(Attributes.ControlChar.VTIME, 0);
+ newAttr.setControlChar(Attributes.ControlChar.VINTR, 0);
+ terminal.setAttributes(newAttr);
+
+ final Map<Signal, SignalHandler> prevSignals = new HashMap<>();
+ prevSignals.put(Signal.WINCH, terminal.handle(Signal.WINCH, this::handleSignal));
+ prevSignals.put(Signal.INT, terminal.handle(Signal.INT, this::handleSignal));
+ prevSignals.put(Signal.QUIT, terminal.handle(Signal.QUIT, this::handleSignal));
+
+ return Tuple2.of(prevAttributes, prevSignals);
+ }
+
+ private void restoreTerminal(Tuple2<Attributes, Map<Signal, SignalHandler>> prev) {
+ final Terminal terminal = client.getTerminal();
+
+ terminal.setAttributes(prev.f0);
+ prev.f1.forEach(terminal::handle);
+ }
+
+ private void unsetTerminalFullScreen() {
+ final Terminal terminal = client.getTerminal();
+
+ terminal.puts(Capability.exit_ca_mode);
+ terminal.puts(Capability.keypad_local);
+ terminal.puts(Capability.cursor_visible);
+ }
+
+ private int computeTotalMainWidth() {
+ final List<AttributedString> mainLines = getMainLines();
+ final List<AttributedString> mainHeaderLines = getMainHeaderLines();
+ final int max1 = mainLines.stream().mapToInt(AttributedString::length).max().orElse(0);
+ final int max2 = mainHeaderLines.stream().mapToInt(AttributedString::length).max().orElse(0);
+ return Math.max(max1, max2);
+ }
+
+ private AttributedString computeTitleLine() {
+ final String title = getTitle();
+ final AttributedStringBuilder titleLine = new AttributedStringBuilder();
+ titleLine.style(AttributedStyle.INVERSE);
+ final int totalMargin = width - title.length();
+ final int margin = totalMargin / 2;
+ repeatChar(titleLine, ' ', margin);
+ titleLine.append(title);
+ repeatChar(titleLine, ' ', margin + (totalMargin % 2));
+ return titleLine.toAttributedString();
+ }
+
+ private void handleSignal(Signal signal) {
+ synchronized (this) {
+ switch (signal) {
+ case INT:
+ close(new SqlExecutionException("Forced interrupt."));
+ break;
+ case QUIT:
+ close(new SqlExecutionException("Forced cancellation."));
+ break;
+ case WINCH:
+ updateSize();
+ if (isRunning) {
+ display();
+ }
+ break;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Starts threads if necessary.
+ */
+ protected abstract void init();
+
+ protected abstract KeyMap<OP> getKeys();
+
+ protected abstract void evaluate(OP operation, String binding);
+
+ protected abstract String getTitle();
+
+ protected abstract List<AttributedString> computeHeaderLines();
+
+ protected abstract List<AttributedString> computeMainHeaderLines();
+
+ protected abstract List<AttributedString> computeMainLines();
+
+ protected abstract List<AttributedString> computeFooterLines();
+
+ protected abstract void cleanUp();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
new file mode 100644
index 0000000..214a17d
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+/**
+ * Simple parser for determining the type of command and its parameters.
+ */
+public final class SqlCommandParser {
+
+ private SqlCommandParser() {
+ // private
+ }
+
+ public static SqlCommandCall parse(String stmt) {
+ String trimmed = stmt.trim();
+ // remove ';' at the end because many people type it intuitively
+ if (trimmed.endsWith(";")) {
+ trimmed = trimmed.substring(0, trimmed.length() - 1);
+ }
+ for (SqlCommand cmd : SqlCommand.values()) {
+ int pos = 0;
+ int tokenCount = 0;
+ for (String token : trimmed.split("\\s")) {
+ pos += token.length() + 1; // include space character
+ // check for content
+ if (token.length() > 0) {
+ // match
+ if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
+ if (tokenCount == cmd.tokens.length - 1) {
+ return new SqlCommandCall(
+ cmd,
+ splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
+ );
+ }
+ } else {
+ // next sql command
+ break;
+ }
+ tokenCount++; // check next token
+ }
+ }
+ }
+ return null;
+ }
+
+ private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
+ switch (cmd) {
+ case SET:
+ final int delimiter = operands.indexOf('=');
+ if (delimiter < 0) {
+ return new String[] {};
+ } else {
+ return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
+ }
+ case SELECT:
+ return new String[] {originalCall};
+ default:
+ return new String[] {operands};
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Supported SQL commands.
+ */
+ enum SqlCommand {
+ QUIT("quit"),
+ EXIT("exit"),
+ CLEAR("clear"),
+ HELP("help"),
+ SHOW_TABLES("show tables"),
+ DESCRIBE("describe"),
+ EXPLAIN("explain"),
+ SELECT("select"),
+ SET("set"),
+ RESET("reset"),
+ SOURCE("source");
+
+ public final String command;
+ public final String[] tokens;
+
+ SqlCommand(String command) {
+ this.command = command;
+ this.tokens = command.split(" ");
+ }
+
+ @Override
+ public String toString() {
+ return command.toUpperCase();
+ }
+ }
+
+ /**
+ * Call of SQL command with operands and command type.
+ */
+ public static class SqlCommandCall {
+ public final SqlCommand command;
+ public final String[] operands;
+
+ public SqlCommandCall(SqlCommand command, String[] operands) {
+ this.command = command;
+ this.operands = operands;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
new file mode 100644
index 0000000..87201a6
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.IOContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Auxiliary functions for configuration file handling.
+ */
+public class ConfigUtil {
+
+ private ConfigUtil() {
+ // private
+ }
+
+ /**
+ * Normalizes key-value properties from Yaml in the normalized format of the Table API.
+ */
+ public static Map<String, String> normalizeYaml(Map<String, Object> yamlMap) {
+ final Map<String, String> normalized = new HashMap<>();
+ yamlMap.forEach((k, v) -> normalizeYamlObject(normalized, k, v));
+ return normalized;
+ }
+
+ private static void normalizeYamlObject(Map<String, String> normalized, String key, Object value) {
+ if (value instanceof Map) {
+ final Map<?, ?> map = (Map<?, ?>) value;
+ map.forEach((k, v) -> normalizeYamlObject(normalized, key + "." + k, v));
+ } else if (value instanceof List) {
+ final List<?> list = (List<?>) value;
+ for (int i = 0; i < list.size(); i++) {
+ normalizeYamlObject(normalized, key + "." + i, list.get(i));
+ }
+ } else {
+ normalized.put(key, value.toString());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Modified object mapper that converts to lower-case keys.
+ */
+ public static class LowerCaseYamlMapper extends ObjectMapper {
+ public LowerCaseYamlMapper() {
+ super(new YAMLFactory() {
+ @Override
+ protected YAMLParser _createParser(InputStream in, IOContext ctxt) throws IOException {
+ final Reader r = _createReader(in, null, ctxt);
+ // normalize all key to lower case keys
+ return new YAMLParser(ctxt, _getBufferRecycler(), _parserFeatures, _yamlParserFeatures, _objectCodec, r) {
+ @Override
+ public String getCurrentName() throws IOException {
+ if (_currToken == JsonToken.FIELD_NAME) {
+ return _currentFieldName.toLowerCase();
+ }
+ return super.getCurrentName();
+ }
+
+ @Override
+ public String getText() throws IOException {
+ if (_currToken == JsonToken.FIELD_NAME) {
+ return _currentFieldName.toLowerCase();
+ }
+ return super.getText();
+ }
+ };
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
new file mode 100644
index 0000000..a87f2e4
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration of a Flink cluster deployment. This class parses the `deployment` part
+ * in an environment file. In the future, we should keep the amount of properties here little and
+ * forward most properties to Flink's CLI frontend properties directly.
+ */
+public class Deployment {
+
+ private final Map<String, String> properties;
+
+ public Deployment() {
+ this.properties = Collections.emptyMap();
+ }
+
+ private Deployment(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public boolean isStandaloneDeployment() {
+ return Objects.equals(
+ properties.getOrDefault(PropertyStrings.DEPLOYMENT_TYPE, PropertyStrings.DEPLOYMENT_TYPE_VALUE_STANDALONE),
+ PropertyStrings.DEPLOYMENT_TYPE_VALUE_STANDALONE);
+ }
+
+ public long getResponseTimeout() {
+ return Long.parseLong(properties.getOrDefault(PropertyStrings.DEPLOYMENT_RESPONSE_TIMEOUT, Long.toString(10000)));
+ }
+
+ public String getGatewayAddress() {
+ return properties.getOrDefault(PropertyStrings.DEPLOYMENT_GATEWAY_ADDRESS, "");
+ }
+
+ public int getGatewayPort() {
+ return Integer.parseInt(properties.getOrDefault(PropertyStrings.DEPLOYMENT_GATEWAY_PORT, Integer.toString(0)));
+ }
+
+ public Map<String, String> toProperties() {
+ final Map<String, String> copy = new HashMap<>();
+ properties.forEach((k, v) -> copy.put(PropertyStrings.DEPLOYMENT + "." + k, v));
+ return copy;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static Deployment create(Map<String, Object> config) {
+ return new Deployment(ConfigUtil.normalizeYaml(config));
+ }
+
+ /**
+ * Merges two deployments. The properties of the first deployment might be overwritten by the second one.
+ */
+ public static Deployment merge(Deployment deploy1, Deployment deploy2) {
+ final Map<String, String> properties = new HashMap<>(deploy1.properties);
+ properties.putAll(deploy2.properties);
+
+ return new Deployment(properties);
+ }
+
+ /**
+ * Creates a new deployment enriched with additional properties.
+ */
+ public static Deployment enrich(Deployment deploy, Map<String, String> properties) {
+ final Map<String, String> newProperties = new HashMap<>(deploy.properties);
+ properties.forEach((k, v) -> {
+ final String normalizedKey = k.toLowerCase();
+ if (k.startsWith(PropertyStrings.DEPLOYMENT + ".")) {
+ newProperties.put(normalizedKey.substring(PropertyStrings.DEPLOYMENT.length() + 1), v);
+ }
+ });
+
+ return new Deployment(newProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
new file mode 100644
index 0000000..a910c49
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Environment configuration that represents the content of an environment file. Environment files
+ * define sources, execution, and deployment behavior. An environment might be defined by default or
+ * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
+ *
+ * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
+ * allow overwriting of a deployment by a session.
+ */
+public class Environment {
+
+ private Map<String, Source> sources;
+
+ private Execution execution;
+
+ private Deployment deployment;
+
+ public Environment() {
+ this.sources = Collections.emptyMap();
+ this.execution = new Execution();
+ this.deployment = new Deployment();
+ }
+
+ public Map<String, Source> getSources() {
+ return sources;
+ }
+
+ public void setSources(List<Map<String, Object>> sources) {
+ this.sources = new HashMap<>(sources.size());
+ sources.forEach(config -> {
+ final Source s = Source.create(config);
+ if (this.sources.containsKey(s.getName())) {
+ throw new SqlClientException("Duplicate source name '" + s + "'.");
+ }
+ this.sources.put(s.getName(), s);
+ });
+ }
+
+ public void setExecution(Map<String, Object> config) {
+ this.execution = Execution.create(config);
+ }
+
+ public Execution getExecution() {
+ return execution;
+ }
+
+ public void setDeployment(Map<String, Object> config) {
+ this.deployment = Deployment.create(config);
+ }
+
+ public Deployment getDeployment() {
+ return deployment;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Parses an environment file from an URL.
+ */
+ public static Environment parse(URL url) throws IOException {
+ return new ConfigUtil.LowerCaseYamlMapper().readValue(url, Environment.class);
+ }
+
+ /**
+ * Parses an environment file from an String.
+ */
+ public static Environment parse(String content) throws IOException {
+ return new ConfigUtil.LowerCaseYamlMapper().readValue(content, Environment.class);
+ }
+
+ /**
+ * Merges two environments. The properties of the first environment might be overwritten by the second one.
+ */
+ public static Environment merge(Environment env1, Environment env2) {
+ final Environment mergedEnv = new Environment();
+
+ // merge sources
+ final Map<String, Source> sources = new HashMap<>(env1.getSources());
+ mergedEnv.getSources().putAll(env2.getSources());
+ mergedEnv.sources = sources;
+
+ // merge execution properties
+ mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution());
+
+ // merge deployment properties
+ mergedEnv.deployment = Deployment.merge(env1.getDeployment(), env2.getDeployment());
+
+ return mergedEnv;
+ }
+
+ public static Environment enrich(Environment env, Map<String, String> properties) {
+ final Environment enrichedEnv = new Environment();
+
+ // merge sources
+ enrichedEnv.sources = new HashMap<>(env.getSources());
+
+ // enrich execution properties
+ enrichedEnv.execution = Execution.enrich(env.execution, properties);
+
+ // enrich deployment properties
+ enrichedEnv.deployment = Deployment.enrich(env.deployment, properties);
+
+ return enrichedEnv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
new file mode 100644
index 0000000..37d1a34
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration of a table program execution. This class parses the `execution` part
+ * in an environment file. The execution describes properties that would usually be defined in the
+ * ExecutionEnvironment/StreamExecutionEnvironment/TableEnvironment or as code in a Flink job.
+ */
+public class Execution {
+
+ private final Map<String, String> properties;
+
+ public Execution() {
+ this.properties = Collections.emptyMap();
+ }
+
+ private Execution(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ // TODO add logger warnings if default value is used
+
+ public boolean isStreamingExecution() {
+ return Objects.equals(
+ properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
+ PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING);
+ }
+
+ public boolean isBatchExecution() {
+ return Objects.equals(
+ properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
+ PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
+ }
+
+ public long getMinStateRetention() {
+ return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+ }
+
+ public long getMaxStateRetention() {
+ return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+ }
+
+ public int getParallelism() {
+ return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_PARALLELISM, Integer.toString(1)));
+ }
+
+ public int getMaxParallelism() {
+ return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
+ }
+
+ public boolean isChangelogMode() {
+ return Objects.equals(
+ properties.getOrDefault(PropertyStrings.EXECUTION_RESULT_MODE, PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG),
+ PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG);
+ }
+
+ public Map<String, String> toProperties() {
+ final Map<String, String> copy = new HashMap<>();
+ properties.forEach((k, v) -> copy.put(PropertyStrings.EXECUTION + "." + k, v));
+ return copy;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static Execution create(Map<String, Object> config) {
+ return new Execution(ConfigUtil.normalizeYaml(config));
+ }
+
+ /**
+ * Merges two executions. The properties of the first execution might be overwritten by the second one.
+ */
+ public static Execution merge(Execution exec1, Execution exec2) {
+ final Map<String, String> newProperties = new HashMap<>(exec1.properties);
+ newProperties.putAll(exec2.properties);
+
+ return new Execution(newProperties);
+ }
+
+ /**
+ * Creates a new execution enriched with additional properties.
+ */
+ public static Execution enrich(Execution exec, Map<String, String> properties) {
+ final Map<String, String> newProperties = new HashMap<>(exec.properties);
+ properties.forEach((k, v) -> {
+ final String normalizedKey = k.toLowerCase();
+ if (k.startsWith(PropertyStrings.EXECUTION + ".")) {
+ newProperties.put(normalizedKey.substring(PropertyStrings.EXECUTION.length() + 1), v);
+ }
+ });
+
+ return new Execution(newProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
new file mode 100644
index 0000000..ba0759d
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+ private PropertyStrings() {
+ // private
+ }
+
+ public static final String EXECUTION = "execution";
+
+ public static final String EXECUTION_TYPE = "type";
+
+ public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+ public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+ public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention";
+
+ public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention";
+
+ public static final String EXECUTION_PARALLELISM = "parallelism";
+
+ public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism";
+
+ public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+ public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog";
+
+ public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+ public static final String DEPLOYMENT = "deployment";
+
+ public static final String DEPLOYMENT_TYPE = "type";
+
+ public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = "standalone";
+
+ public static final String DEPLOYMENT_RESPONSE_TIMEOUT = "response-timeout";
+
+ public static final String DEPLOYMENT_GATEWAY_ADDRESS = "gateway-address";
+
+ public static final String DEPLOYMENT_GATEWAY_PORT = "gateway-port";
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
new file mode 100644
index 0000000..7b2498f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableSourceDescriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration of a table source. Parses an entry in the `sources` list of an environment
+ * file and translates to table descriptor properties.
+ */
+public class Source extends TableSourceDescriptor {
+
+ private String name;
+ private Map<String, String> properties;
+
+ private static final String NAME = "name";
+
+ private Source(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public static Source create(Map<String, Object> config) {
+ if (!config.containsKey(NAME)) {
+ throw new SqlClientException("The 'name' attribute of a table source is missing.");
+ }
+ final Object name = config.get(NAME);
+ if (name == null || !(name instanceof String) || ((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid table source name '" + name + "'.");
+ }
+ final Map<String, Object> properties = new HashMap<>(config);
+ properties.remove(NAME);
+ return new Source((String) name, ConfigUtil.normalizeYaml(properties));
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void addProperties(DescriptorProperties properties) {
+ this.properties.forEach(properties::putString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
new file mode 100644
index 0000000..512c194
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A gateway for communicating with Flink and other external systems.
+ */
+public interface Executor {
+
+ /**
+ * Starts the executor and ensures that its is ready for commands to be executed.
+ */
+ void start() throws SqlExecutionException;
+
+ /**
+ * Lists all session properties that are defined by the executor and the session.
+ */
+ Map<String, String> getSessionProperties(SessionContext context) throws SqlExecutionException;
+
+ /**
+ * Lists all tables known to the executor.
+ */
+ List<String> listTables(SessionContext context) throws SqlExecutionException;
+
+ /**
+ * Returns the schema of a table. Throws an exception if the table could not be found.
+ */
+ TableSchema getTableSchema(SessionContext context, String name) throws SqlExecutionException;
+
+ /**
+ * Returns a string-based explanation about AST and execution plan of the given statement.
+ */
+ String explainStatement(SessionContext context, String statement) throws SqlExecutionException;
+
+ /**
+ * Submits a Flink job (detached) and returns the result descriptor.
+ */
+ ResultDescriptor executeQuery(SessionContext context, String query) throws SqlExecutionException;
+
+ /**
+ * Asks for the next changelog results (non-blocking).
+ */
+ TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext context, String resultId) throws SqlExecutionException;
+
+ /**
+ * Creates an immutable result snapshot of the running Flink job. Throws an exception if no Flink job can be found.
+ * Returns the number of pages.
+ */
+ TypedResult<Integer> snapshotResult(SessionContext context, String resultId, int pageSize) throws SqlExecutionException;
+
+ /**
+ * Returns the rows that are part of the current page or throws an exception if the snapshot has been expired.
+ */
+ List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
+
+ /**
+ * Cancels a table program and stops the result retrieval.
+ */
+ void cancelQuery(SessionContext context, String resultId) throws SqlExecutionException;
+
+ /**
+ * Stops the executor.
+ */
+ void stop(SessionContext context);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
new file mode 100644
index 0000000..3cfaa6f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.table.api.TableSchema;
+
+/**
+ * Describes a result to be expected from a table program.
+ */
+public class ResultDescriptor {
+
+ private final String resultId;
+
+ private final TableSchema resultSchema;
+
+ private final boolean isMaterialized;
+
+ public ResultDescriptor(String resultId, TableSchema resultSchema, boolean isMaterialized) {
+ this.resultId = resultId;
+ this.resultSchema = resultSchema;
+ this.isMaterialized = isMaterialized;
+ }
+
+ public String getResultId() {
+ return resultId;
+ }
+
+ public TableSchema getResultSchema() {
+ return resultSchema;
+ }
+
+ public boolean isMaterialized() {
+ return isMaterialized;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
new file mode 100644
index 0000000..1058eb6
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.table.client.config.Environment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Context describing a session.
+ */
+public class SessionContext {
+
+ private final String name;
+ private final Environment defaultEnvironment;
+ private final Map<String, String> sessionProperties;
+
+ public SessionContext(String name, Environment defaultEnvironment) {
+ this.name = name;
+ this.defaultEnvironment = defaultEnvironment;
+ this.sessionProperties = new HashMap<>();
+ }
+
+ public void setSessionProperty(String key, String value) {
+ sessionProperties.put(key, value);
+ }
+
+ public void resetSessionProperties() {
+ sessionProperties.clear();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Environment getEnvironment() {
+ // enrich with session properties
+ return Environment.enrich(defaultEnvironment, sessionProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
new file mode 100644
index 0000000..d14da7f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlExecutionException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SqlExecutionException(String message) {
+ super(message);
+ }
+
+ public SqlExecutionException(String message, Throwable e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
new file mode 100644
index 0000000..ee4e8d3
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Result with an attached type (actual payload, EOS, etc.).
+ *
+ * @param <P> type of payload
+ */
+public class TypedResult<P> {
+
+ private ResultType type;
+
+ private P payload;
+
+ private TypedResult(ResultType type, P payload) {
+ this.type = type;
+ this.payload = payload;
+ }
+
+ public void setType(ResultType type) {
+ this.type = type;
+ }
+
+ public void setPayload(P payload) {
+ this.payload = payload;
+ }
+
+ public ResultType getType() {
+ return type;
+ }
+
+ public P getPayload() {
+ return payload;
+ }
+
+ @Override
+ public String toString() {
+ return "TypedResult<" + type + ">";
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static <T> TypedResult<T> empty() {
+ return new TypedResult<>(ResultType.EMPTY, null);
+ }
+
+ public static <T> TypedResult<T> payload(T payload) {
+ return new TypedResult<>(ResultType.PAYLOAD, payload);
+ }
+
+ public static <T> TypedResult<T> endOfStream() {
+ return new TypedResult<>(ResultType.EOS, null);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Result types.
+ */
+ public enum ResultType {
+ PAYLOAD,
+ EMPTY,
+ EOS
+ }
+}
|