flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/3] flink git commit: [FLINK-8607] [table] Add a basic embedded SQL CLI client
Date Sat, 17 Feb 2018 08:19:21 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
new file mode 100644
index 0000000..6ded8fa
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
+
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+/**
+ * Utility class that contains all strings for CLI commands and messages.
+ */
+public final class CliStrings {
+
+	private CliStrings() {
+		// private
+	}
+
+	public static final String CLI_NAME = "Flink SQL CLI Client";
+	public static final String DEFAULT_MARGIN = " ";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
+		.append("The following commands are available:\n\n")
+		.append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client."))
+		.append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal."))
+		.append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
+		.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
+		.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
+		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
+		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
+		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
+		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
+		.style(AttributedStyle.DEFAULT.underline())
+		.append("\nHint")
+		.style(AttributedStyle.DEFAULT)
+		.append(": Use '\\' for multi-line commands.")
+		.toAttributedString();
+
+	public static final String MESSAGE_WELCOME;
+	// make findbugs happy
+	static {
+		MESSAGE_WELCOME = "                                   \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592\n" +
+			"                               \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592\n" +
+			"                            \u2593\u2588\u2588\u2588\u2593\u2591\u2591        \u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592\n" +
+			"                          \u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      \u2592\u2588\u2588\u2588\u2588\n" +
+			"                          \u2588\u2588\u2592         \u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592\n" +
+			"                            \u2591\u2593\u2588            \u2588\u2588\u2588   \u2593\u2591\u2592\u2588\u2588\n" +
+			"                              \u2593\u2588       \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588\n" +
+			"                            \u2588\u2591 \u2588   \u2592\u2592\u2591       \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592\n" +
+			"                            \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593      \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592\n" +
+			"                         \u2591\u2592\u2588\u2593\u2593\u2588\u2588       \u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591\n" +
+			"                   \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588         \u2592\u2588    \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592\n" +
+			"                  \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588           \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592\n" +
+			"                \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592\n" +
+			"               \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  \u2593\u2591\n" +
+			"              \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593\n" +
+			"           \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         \u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592\n" +
+			"           \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              \u2592\u2588\u2588\u2593           \u2591\u2588\u2592\n" +
+			"           \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591              \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 \u2593\u2588\n" +
+			"           \u2588\u2588\u2593    \u2588\u2588\u2592    \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592            \u2593\u2588\u2588\u2588  \u2588\n" +
+			"          \u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   \u2591\u2593\u2593\u2592\u2591\u2591   \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  \u2591\u2592\u2593\u2592  \u2588\u2593\n" +
+			"          \u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591                            \u2588\u2593\n" +
+			"          \u2588\u2588 \u2593\u2591\u2592\u2588   \u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       \u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593\n" +
+			"          \u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  \u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592   \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592\n" +
+			"           \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  \u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   \u2591\u2588\u2592\n" +
+			"           \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                \u2592\u2588              \u2588\u2593\n" +
+			"            \u2588\u2593   \u2588\u2588         \u2588\u2591                 \u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591\n" +
+			"             \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592                  \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    \u2592\u2588\n" +
+			"              \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    \u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593\n" +
+			"               \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                         \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588\n" +
+			"                \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593\n" +
+			"                  \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  \u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592\n" +
+			"                      \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593  \u2593\u2591\u2592\u2588\u2591\n" +
+			"          \n" +
+			"    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   \n" +
+			"   |  ____| (_)     | |     / ____|/ __ \\| |       / ____| (_)          | |  \n" +
+			"   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ \n" +
+			"   |  __| | | | '_ \\| |/ /  \\___ \\| |  | | |      | |    | | |/ _ \\ '_ \\| __|\n" +
+			"   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ \n" +
+			"   |_|    |_|_|_| |_|_|\\_\\ |_____/ \\___\\_\\______|  \\_____|_|_|\\___|_| |_|\\__|\n" +
+			"          \n" +
+			"        Welcome! Enter HELP to list all available commands. QUIT to exit.\n\n";
+	}
+
+	public static final String MESSAGE_QUIT = "Exiting " + CliStrings.CLI_NAME + "...";
+
+	public static final String MESSAGE_SQL_EXECUTION_ERROR = "Could not execute SQL statement.";
+
+	public static final String MESSAGE_RESET = "All session properties have been set to their default values.";
+
+	public static final String MESSAGE_SET = "Session property has been set.";
+
+	public static final String MESSAGE_EMPTY = "Result was empty.";
+
+	public static final String MESSAGE_UNKNOWN_SQL = "Unknown SQL statement.";
+
+	public static final String MESSAGE_UNKNOWN_TABLE = "Unknown table.";
+
+	public static final String MESSAGE_RESULT_SNAPSHOT_ERROR = "Could not create a snapshot of the dynamic table.";
+
+	public static final String MESSAGE_RESULT_QUIT = "Result retrieval cancelled.";
+
+	public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
+
+	public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
+
+	public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final String RESULT_TITLE = "SQL Query Result";
+
+	public static final String RESULT_REFRESH_INTERVAL = "Refresh:";
+
+	public static final String RESULT_PAGE = "Page:";
+
+	public static final String RESULT_PAGE_OF = " of ";
+
+	public static final String RESULT_LAST_REFRESH = "Updated:";
+
+	public static final String RESULT_LAST_PAGE = "Last";
+
+	public static final String RESULT_QUIT = "Quit";
+
+	public static final String RESULT_REFRESH = "Refresh";
+
+	public static final String RESULT_GOTO = "Goto Page";
+
+	public static final String RESULT_NEXT = "Next Page";
+
+	public static final String RESULT_PREV = "Prev Page";
+
+	public static final String RESULT_LAST = "Last Page";
+
+	public static final String RESULT_FIRST = "First Page";
+
+	public static final String RESULT_SEARCH = "Search";
+
+	public static final String RESULT_INC_REFRESH = "Inc Refresh"; // implementation assumes max length of 11
+
+	public static final String RESULT_DEC_REFRESH = "Dec Refresh";
+
+	public static final String RESULT_OPEN = "Open Row";
+
+	public static final String RESULT_CHANGELOG = "Changelog";
+
+	public static final String RESULT_TABLE = "Table";
+
+	public static final String RESULT_STOPPED = "Table program finished.";
+
+	public static final String RESULT_REFRESH_UNKNOWN = "Unknown";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final String INPUT_TITLE = "Input Dialog";
+
+	public static final AttributedString INPUT_HELP = new AttributedStringBuilder()
+		.append("Press ")
+		.style(AttributedStyle.DEFAULT.inverse())
+		.append("Enter")
+		.style(AttributedStyle.DEFAULT)
+		.append(" to submit. Press ")
+		.style(AttributedStyle.DEFAULT.inverse())
+		.append("ESC")
+		.style(AttributedStyle.DEFAULT)
+		.append(" or submit an empty string to cancel.")
+		.toAttributedString();
+
+	public static final String INPUT_ENTER_PAGE = "Enter page number:";
+
+	public static final String INPUT_ERROR = "The input is invalid please check it again.";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final AttributedString ROW_QUIT = new AttributedStringBuilder()
+		.append("Press ")
+		.style(AttributedStyle.DEFAULT.inverse())
+		.append("Q")
+		.style(AttributedStyle.DEFAULT)
+		.append(" to go back.")
+		.toAttributedString();
+
+	public static final String ROW_HEADER = "Row Summary";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static AttributedString messageInfo(String message) {
+		return new AttributedStringBuilder()
+			.style(AttributedStyle.DEFAULT.bold().foreground(AttributedStyle.BLUE))
+			.append("[INFO] ")
+			.append(message)
+			.toAttributedString();
+	}
+
+	public static AttributedString messageError(String message, Throwable t) {
+		while (t.getCause() != null && t.getCause().getMessage() != null && !t.getCause().getMessage().isEmpty()) {
+			t = t.getCause();
+		}
+		return messageError(message, t.getClass().getName() + ": " + t.getMessage());
+		// return messageError(message, ExceptionUtils.stringifyException(t));
+	}
+
+	public static AttributedString messageError(String message) {
+		return messageError(message, (String) null);
+	}
+
+	public static AttributedString messageError(String message, String s) {
+		final AttributedStringBuilder builder = new AttributedStringBuilder()
+			.style(AttributedStyle.DEFAULT.bold().foreground(AttributedStyle.RED))
+			.append("[ERROR] ")
+			.append(message);
+
+		if (s != null) {
+			builder
+				.append(" Reason:\n")
+				.append(s);
+		}
+
+		return builder.toAttributedString();
+	}
+
+	private static AttributedString formatCommand(SqlCommand cmd, String description) {
+		return new AttributedStringBuilder()
+			.style(AttributedStyle.DEFAULT.bold())
+			.append(cmd.toString())
+			.append("\t\t")
+			.style(AttributedStyle.DEFAULT)
+			.append(description)
+			.append('\n')
+			.toAttributedString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
new file mode 100644
index 0000000..4d9c69b
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.jline.keymap.KeyMap;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp.Capability;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
+import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
+import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
+import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
+import static org.jline.keymap.KeyMap.ctrl;
+import static org.jline.keymap.KeyMap.esc;
+import static org.jline.keymap.KeyMap.key;
+
+/**
+ * CLI view for retrieving and displaying a table.
+ */
+public class CliTableResultView extends CliResultView<CliTableResultView.ResultTableOperation> {
+
+	private int pageCount;
+	private int page;
+	private LocalTime lastRetrieval;
+
+	private static final int DEFAULT_REFRESH_INTERVAL = 3; // every 1s
+	private static final int MIN_REFRESH_INTERVAL = 1; // every 100ms
+	private static final int LAST_PAGE = 0;
+
+	public CliTableResultView(CliClient client, ResultDescriptor resultDescriptor) {
+		super(client, resultDescriptor);
+
+		refreshInterval = DEFAULT_REFRESH_INTERVAL;
+		pageCount = 1;
+		page = LAST_PAGE;
+
+		previousResults = Collections.emptyList();
+		results = Collections.emptyList();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected String[] getRow(String[] resultRow) {
+		return resultRow;
+	}
+
+	@Override
+	protected int computeColumnWidth(int idx) {
+		return MAX_COLUMN_WIDTH;
+	}
+
+	@Override
+	protected void refresh() {
+		// take snapshot
+		TypedResult<Integer> result;
+		try {
+			result = client.getExecutor().snapshotResult(client.getContext(), resultDescriptor.getResultId(), getVisibleMainHeight());
+		} catch (SqlExecutionException e) {
+			close(e);
+			return;
+		}
+
+		// stop retrieval if job is done
+		if (result.getType() == TypedResult.ResultType.EOS) {
+			stopRetrieval();
+		}
+		// update page
+		else if (result.getType() == TypedResult.ResultType.PAYLOAD) {
+			int newPageCount = result.getPayload();
+			pageCount = newPageCount;
+			if (page > newPageCount) {
+				page = LAST_PAGE;
+			}
+			updatePage();
+		}
+
+		lastRetrieval = LocalTime.now();
+
+		// reset view
+		resetAllParts();
+	}
+
+	@Override
+	protected KeyMap<ResultTableOperation> getKeys() {
+		final KeyMap<ResultTableOperation> keys = new KeyMap<>();
+		keys.setAmbiguousTimeout(200); // make ESC quicker
+		keys.bind(ResultTableOperation.QUIT, "q", "Q", esc(), ctrl('c'));
+		keys.bind(ResultTableOperation.REFRESH, "r", "R", key(client.getTerminal(), Capability.key_f5));
+		keys.bind(ResultTableOperation.UP, "w", "W", key(client.getTerminal(), Capability.key_up));
+		keys.bind(ResultTableOperation.DOWN, "s", "S", key(client.getTerminal(), Capability.key_down));
+		keys.bind(ResultTableOperation.LEFT, "a", "A", key(client.getTerminal(), Capability.key_left));
+		keys.bind(ResultTableOperation.RIGHT, "d", "D", key(client.getTerminal(), Capability.key_right));
+		keys.bind(ResultTableOperation.OPEN, "o", "O", "\r");
+		keys.bind(ResultTableOperation.GOTO, "g", "G");
+		keys.bind(ResultTableOperation.NEXT, "n", "N");
+		keys.bind(ResultTableOperation.PREV, "p", "P");
+		keys.bind(ResultTableOperation.LAST, "l", "L", key(client.getTerminal(), Capability.key_end));
+		keys.bind(ResultTableOperation.INC_REFRESH, "+");
+		keys.bind(ResultTableOperation.DEC_REFRESH, "-");
+		return keys;
+	}
+
+	@Override
+	protected void evaluate(ResultTableOperation operation, String binding) {
+		switch (operation) {
+			case QUIT:
+				close();
+				break;
+			case REFRESH:
+				refresh();
+				break;
+			case UP:
+				selectRowUp();
+				break;
+			case DOWN:
+				selectRowDown();
+				break;
+			case OPEN:
+				openRow();
+				break;
+			case GOTO:
+				gotoPage();
+				break;
+			case NEXT:
+				gotoNextPage();
+				break;
+			case PREV:
+				gotoPreviousPage();
+				break;
+			case LAST:
+				gotoLastPage();
+				break;
+			case LEFT:
+				scrollLeft();
+				break;
+			case RIGHT:
+				scrollRight();
+				break;
+			case INC_REFRESH:
+				increaseRefreshInterval();
+				break;
+			case DEC_REFRESH:
+				decreaseRefreshInterval(MIN_REFRESH_INTERVAL);
+				break;
+		}
+	}
+
+	@Override
+	protected String getTitle() {
+		return CliStrings.RESULT_TITLE + " (" + CliStrings.RESULT_TABLE + ")";
+	}
+
+	@Override
+	protected List<AttributedString> computeHeaderLines() {
+		final AttributedStringBuilder statusLine = new AttributedStringBuilder();
+		statusLine.style(AttributedStyle.INVERSE);
+		// left
+		final String left;
+		if (isRetrieving()) {
+			left = CliStrings.DEFAULT_MARGIN + CliStrings.RESULT_REFRESH_INTERVAL + ' ' + REFRESH_INTERVALS.get(refreshInterval).f0;
+		} else {
+			left = CliStrings.DEFAULT_MARGIN + CliStrings.RESULT_STOPPED;
+		}
+		// middle
+		final StringBuilder middleBuilder = new StringBuilder();
+		middleBuilder.append(CliStrings.RESULT_PAGE);
+		middleBuilder.append(' ');
+		if (page == LAST_PAGE) {
+			middleBuilder.append(CliStrings.RESULT_LAST_PAGE);
+		} else {
+			middleBuilder.append(page);
+		}
+		middleBuilder.append(CliStrings.RESULT_PAGE_OF);
+		middleBuilder.append(pageCount);
+		final String middle = middleBuilder.toString();
+		// right
+		final String right;
+		if (lastRetrieval == null) {
+			right = CliStrings.RESULT_LAST_REFRESH + ' ' + CliStrings.RESULT_REFRESH_UNKNOWN + CliStrings.DEFAULT_MARGIN;
+		} else {
+			right = CliStrings.RESULT_LAST_REFRESH + ' ' + lastRetrieval.format(TIME_FORMATTER) + CliStrings.DEFAULT_MARGIN;
+		}
+		// all together
+		final int totalLeftSpace = getWidth() - middle.length();
+		final int leftSpace = totalLeftSpace / 2 - left.length();
+		statusLine.append(left);
+		repeatChar(statusLine, ' ', leftSpace);
+		statusLine.append(middle);
+		final int rightSpacing = getWidth() - statusLine.length() - right.length();
+		repeatChar(statusLine, ' ', rightSpacing);
+		statusLine.append(right);
+
+		return Arrays.asList(statusLine.toAttributedString(), AttributedString.EMPTY);
+	}
+
+	@Override
+	protected List<AttributedString> computeMainHeaderLines() {
+		final AttributedStringBuilder schemaHeader = new AttributedStringBuilder();
+
+		Arrays.stream(resultDescriptor.getResultSchema().getColumnNames()).forEach(s -> {
+			schemaHeader.append(' ');
+			schemaHeader.style(AttributedStyle.DEFAULT.underline());
+			normalizeColumn(schemaHeader, s, MAX_COLUMN_WIDTH);
+			schemaHeader.style(AttributedStyle.DEFAULT);
+		});
+
+		return Collections.singletonList(schemaHeader.toAttributedString());
+	}
+
+	@Override
+	protected List<AttributedString> computeFooterLines() {
+		return formatTwoLineHelpOptions(getWidth(), getHelpOptions());
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void updatePage() {
+		// retrieve page
+		final int retrievalPage = page == LAST_PAGE ? pageCount : page;
+		final List<Row> rows;
+		try {
+			rows = client.getExecutor().retrieveResultPage(resultDescriptor.getResultId(), retrievalPage);
+		} catch (SqlExecutionException e) {
+			close(e);
+			return;
+		}
+
+		// convert page
+		final List<String[]> stringRows = rows
+				.stream()
+				.map(CliUtils::rowToString)
+				.collect(Collectors.toList());
+
+		// update results
+		previousResults = results;
+		results = stringRows;
+
+		// check if selected row is still valid
+		if (selectedRow != NO_ROW_SELECTED) {
+			if (selectedRow >= results.size()) {
+				selectedRow = NO_ROW_SELECTED;
+			}
+		}
+
+		// reset view
+		resetAllParts();
+	}
+
+	private List<Tuple2<String, String>> getHelpOptions() {
+		final List<Tuple2<String, String>> options = new ArrayList<>();
+
+		options.add(Tuple2.of("Q", CliStrings.RESULT_QUIT));
+		options.add(Tuple2.of("R", CliStrings.RESULT_REFRESH));
+
+		options.add(Tuple2.of("+", CliStrings.RESULT_INC_REFRESH));
+		options.add(Tuple2.of("-", CliStrings.RESULT_DEC_REFRESH));
+
+		options.add(Tuple2.of("G", CliStrings.RESULT_GOTO));
+		options.add(Tuple2.of("L", CliStrings.RESULT_LAST));
+
+		options.add(Tuple2.of("N", CliStrings.RESULT_NEXT));
+		options.add(Tuple2.of("P", CliStrings.RESULT_PREV));
+
+		options.add(Tuple2.of("O", CliStrings.RESULT_OPEN));
+
+		return options;
+	}
+
+	private void gotoPage() {
+		final CliInputView view = new CliInputView(
+			client,
+			CliStrings.INPUT_ENTER_PAGE,
+			(s) -> {
+				// validate input
+				final int newPage;
+				try {
+					newPage = Integer.parseInt(s);
+				} catch (NumberFormatException e) {
+					return false;
+				}
+				return newPage > 0 && newPage <= pageCount;
+			});
+		view.open(); // enter view
+		if (view.getResult() != null) {
+			page = Integer.parseInt(view.getResult());
+			updatePage();
+		}
+	}
+
+	private void gotoNextPage() {
+		final int curPageIndex = page == LAST_PAGE ? pageCount : page;
+		if (curPageIndex < pageCount) {
+			page = curPageIndex + 1;
+		}
+		updatePage();
+	}
+
+	private void gotoPreviousPage() {
+		final int curPageIndex = page == LAST_PAGE ? pageCount : page;
+		if (curPageIndex > 1) {
+			page = curPageIndex - 1;
+		}
+		updatePage();
+	}
+
+	private void gotoLastPage() {
+		page = LAST_PAGE;
+		updatePage();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Available operations for this view.
+	 */
+	public enum ResultTableOperation {
+		QUIT, // leave view
+		REFRESH, // refresh current table page
+		UP, // row selection up
+		DOWN, // row selection down
+		OPEN, // shows a full row
+		GOTO, // enter table page number
+		NEXT, // next table page
+		PREV, // previous table page
+		LAST, // last table page
+		LEFT, // scroll left if row is large
+		RIGHT, // scroll right if row is large
+		INC_REFRESH, // increase refresh rate
+		DEC_REFRESH, // decrease refresh rate
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
new file mode 100644
index 0000000..d0adaa1
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/**
+ * Utilities for CLI formatting.
+ */
+public final class CliUtils {
+
+	public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
+
+	private CliUtils() {
+		// private
+	}
+
+	public static void repeatChar(AttributedStringBuilder sb, char c, int count) {
+		IntStream.range(0, count).forEach(i -> sb.append(c));
+	}
+
+	public static void normalizeColumn(AttributedStringBuilder sb, String col, int maxWidth) {
+		// limit column content
+		if (col.length() > maxWidth) {
+			sb.append(col, 0, maxWidth - 1);
+			sb.append('~');
+		} else {
+			repeatChar(sb, ' ', maxWidth - col.length());
+			sb.append(col);
+		}
+	}
+
+	public static List<AttributedString> formatTwoLineHelpOptions(int width, List<Tuple2<String, String>> options) {
+		final AttributedStringBuilder line1 = new AttributedStringBuilder();
+		final AttributedStringBuilder line2 = new AttributedStringBuilder();
+
+		// we assume that every options has not more than 11 characters (+ key and space)
+		final int columns = (int) Math.ceil(((double) options.size()) / 2);
+		final int space = (width - CliStrings.DEFAULT_MARGIN.length() - columns * 13) / columns;
+		final Iterator<Tuple2<String, String>> iter = options.iterator();
+		while (iter.hasNext()) {
+			// first line
+			Tuple2<String, String> option = iter.next();
+			line1.style(AttributedStyle.DEFAULT.inverse());
+			line1.append(option.f0);
+			line1.style(AttributedStyle.DEFAULT);
+			line1.append(' ');
+			line1.append(option.f1);
+			repeatChar(line1, ' ', (11 - option.f1.length()) + space);
+			// second line
+			if (iter.hasNext()) {
+				option = iter.next();
+				line2.style(AttributedStyle.DEFAULT.inverse());
+				line2.append(option.f0);
+				line2.style(AttributedStyle.DEFAULT);
+				line2.append(' ');
+				line2.append(option.f1);
+				repeatChar(line2, ' ', (11 - option.f1.length()) + space);
+			}
+		}
+
+		return Arrays.asList(line1.toAttributedString(), line2.toAttributedString());
+	}
+
+	public static String[] rowToString(Row row) {
+		final String[] fields = new String[row.getArity()];
+		for (int i = 0; i < row.getArity(); i++) {
+			fields[i] = row.getField(i).toString();
+		}
+		return fields;
+	}
+
+	public static String[] typesToString(TypeInformation<?>[] types) {
+		final String[] typesAsString = new String[types.length];
+		for (int i = 0; i < types.length; i++) {
+			typesAsString[i] = types[i].toString();
+		}
+		return typesAsString;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
new file mode 100644
index 0000000..3fa1aab
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliView.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Attributes.LocalFlag;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.Terminal.Signal;
+import org.jline.terminal.Terminal.SignalHandler;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp.Capability;
+
+import java.io.IOError;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
+
+/**
+ * Framework for a CLI view with header, footer, and main part that is scrollable.
+ *
+ * @param <OP> supported list of operations
+ */
+public abstract class CliView<OP extends Enum<OP>, OUT> {
+
+	protected final CliClient client;
+
+	protected int offsetX;
+
+	protected int offsetY;
+
+	private boolean isRunning;
+
+	private Thread inputThread;
+
+	private int width;
+
+	private int height;
+
+	private final BindingReader keyReader;
+
+	private AttributedString titleLine;
+
+	private List<AttributedString> headerLines;
+
+	private List<AttributedString> mainHeaderLines; // vertically scrollable
+
+	private List<AttributedString> mainLines;
+
+	private List<AttributedString> footerLines;
+
+	private int totalMainWidth;
+
+	private SqlExecutionException executionException;
+
+	private OUT result;
+
+	public CliView(CliClient client) {
+		this.client = client;
+
+		keyReader = new BindingReader(client.getTerminal().reader());
+	}
+
+	public void open() {
+		isRunning = true;
+		inputThread = Thread.currentThread();
+
+		// prepare terminal
+		final Tuple2<Attributes, Map<Signal, SignalHandler>> prev = prepareTerminal();
+		ensureTerminalFullScreen();
+		updateSize();
+
+		init();
+
+		synchronized (this) {
+			display();
+		}
+
+		final KeyMap<OP> keys = getKeys();
+
+		while (isRunning) {
+
+			final OP operation;
+			try {
+				operation = keyReader.readBinding(keys, null, true);
+			} catch (IOError e) {
+				break;
+			}
+
+			// refresh loop
+			if (operation == null) {
+				continue;
+			}
+
+			synchronized (this) {
+				try {
+					evaluate(operation, keyReader.getLastBinding());
+				} catch (SqlExecutionException e) {
+					// in case the evaluate method did not use the close method
+					close(e);
+				}
+
+				if (isRunning) {
+					// ensure full-screen again in case a sub-view has been opened in evaluate
+					ensureTerminalFullScreen();
+
+					display();
+				}
+			}
+		}
+
+		cleanUp();
+
+		// clean terminal
+		restoreTerminal(prev);
+		unsetTerminalFullScreen();
+
+		if (executionException != null) {
+			throw executionException;
+		}
+	}
+
+	public OUT getResult() {
+		return result;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	protected boolean isRunning() {
+		return isRunning;
+	}
+
+	protected void close() {
+		if (isRunning) {
+			isRunning = false;
+			// break the input loop if this method is called from another thread
+			if (Thread.currentThread() != inputThread) {
+				inputThread.interrupt();
+			}
+		}
+	}
+
+	protected void close(SqlExecutionException e) {
+		executionException = e;
+		close();
+	}
+
+	protected void close(OUT result) {
+		this.result = result;
+		isRunning = false;
+	}
+
+	protected void display() {
+		// cache
+		final List<AttributedString> headerLines = getHeaderLines();
+		final List<AttributedString> mainHeaderLines = getMainHeaderLines();
+		final List<AttributedString> mainLines = getMainLines();
+		final List<AttributedString> footerLines = getFooterLines();
+		final int visibleMainHeight = getVisibleMainHeight();
+		final int totalMainWidth = getTotalMainWidth();
+
+		// create output
+		client.clearTerminal();
+
+		final List<String> lines = new ArrayList<>();
+
+		// title part
+		client.getTerminal().writer().println(computeTitleLine().toAnsi());
+
+		// header part
+		headerLines.forEach(l -> client.getTerminal().writer().println(l.toAnsi()));
+
+		// main part
+		// update vertical offset
+		if (visibleMainHeight > mainLines.size()) {
+			offsetY = 0; // enough space
+		} else {
+			offsetY = Math.min(mainLines.size() - visibleMainHeight, offsetY); // bound offset
+		}
+		// update horizontal offset
+		if (width > totalMainWidth) {
+			offsetX = 0; // enough space
+		} else {
+			offsetX = Math.min(totalMainWidth - width, offsetX); // bound offset
+		}
+		// create window
+		final List<AttributedString> windowedMainLines =
+			mainLines.subList(offsetY, Math.min(mainLines.size(), offsetY + visibleMainHeight));
+		// print window
+		Stream.concat(mainHeaderLines.stream(), windowedMainLines.stream()).forEach(l -> {
+			if (offsetX < l.length()) {
+				final AttributedString windowX = l.substring(offsetX, Math.min(l.length(), offsetX + width));
+				client.getTerminal().writer().println(windowX.toAnsi());
+			} else {
+				client.getTerminal().writer().println(); // nothing to show for this line
+			}
+		});
+
+		// footer part
+		final int emptyHeight = height - 1 - headerLines.size() - // -1 = title
+			windowedMainLines.size() - mainHeaderLines.size() - footerLines.size();
+		// padding
+		IntStream.range(0, emptyHeight).forEach(i -> client.getTerminal().writer().println());
+		// footer
+		IntStream.range(0, footerLines.size()).forEach((i) -> {
+			final AttributedString l = footerLines.get(i);
+			if (i == footerLines.size() - 1) {
+				client.getTerminal().writer().print(l.toAnsi());
+			} else {
+				client.getTerminal().writer().println(l.toAnsi());
+			}
+		});
+
+		client.getTerminal().flush();
+	}
+
+	protected void scrollLeft() {
+		if (offsetX > 0) {
+			offsetX -= 1;
+		}
+	}
+
+	protected void scrollRight() {
+		final int maxOffset = Math.max(0, getTotalMainWidth() - width);
+		if (offsetX < maxOffset) {
+			offsetX += 1;
+		}
+	}
+
+	protected void scrollUp() {
+		if (offsetY > 0) {
+			offsetY -= 1;
+		}
+	}
+
+	protected void scrollDown() {
+		scrollDown(1);
+	}
+
+	protected void scrollDown(int n) {
+		final int maxOffset = Math.max(0, getMainLines().size() - getVisibleMainHeight());
+		offsetY = Math.min(maxOffset, offsetY + n);
+	}
+
+	protected int getVisibleMainHeight() {
+		// -1 = title line
+		return height - 1 - getHeaderLines().size() - getMainHeaderLines().size() -
+			getFooterLines().size();
+	}
+
+	protected List<AttributedString> getHeaderLines() {
+		if (headerLines == null) {
+			headerLines = computeHeaderLines();
+		}
+		return headerLines;
+	}
+
+	protected List<AttributedString> getMainHeaderLines() {
+		if (mainHeaderLines == null) {
+			mainHeaderLines = computeMainHeaderLines();
+			totalMainWidth = computeTotalMainWidth();
+		}
+		return mainHeaderLines;
+	}
+
+	protected List<AttributedString> getMainLines() {
+		if (mainLines == null) {
+			mainLines = computeMainLines();
+			totalMainWidth = computeTotalMainWidth();
+		}
+		return mainLines;
+	}
+
+	protected List<AttributedString> getFooterLines() {
+		if (footerLines == null) {
+			footerLines = computeFooterLines();
+		}
+		return footerLines;
+	}
+
+	protected int getTotalMainWidth() {
+		if (totalMainWidth <= 0) {
+			totalMainWidth = computeTotalMainWidth();
+		}
+		return totalMainWidth;
+	}
+
+	protected AttributedString getTitleLine() {
+		if (titleLine == null) {
+			titleLine = computeTitleLine();
+		}
+		return titleLine;
+	}
+
+	/**
+	 * Must be called when values in one or more parts have changed.
+	 */
+	protected void resetAllParts() {
+		titleLine = null;
+		headerLines = null;
+		mainHeaderLines = null;
+		mainLines = null;
+		footerLines = null;
+		totalMainWidth = 0;
+	}
+
+	/**
+	 * Must be called when values in the main part (main header or main) have changed.
+	 */
+	protected void resetMainPart() {
+		mainHeaderLines = null;
+		mainLines = null;
+		totalMainWidth = 0;
+	}
+
+	protected int getWidth() {
+		return width;
+	}
+
+	protected int getHeight() {
+		return height;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void updateSize() {
+		width = client.getWidth();
+		height = client.getHeight();
+		totalMainWidth = width;
+		resetAllParts();
+	}
+
+	private void ensureTerminalFullScreen() {
+		final Terminal terminal = client.getTerminal();
+		terminal.puts(Capability.enter_ca_mode);
+		terminal.puts(Capability.keypad_xmit);
+		terminal.puts(Capability.cursor_invisible);
+	}
+
+	private Tuple2<Attributes, Map<Signal, SignalHandler>> prepareTerminal() {
+		final Terminal terminal = client.getTerminal();
+
+		final Attributes prevAttributes = terminal.getAttributes();
+		// adopted from org.jline.builtins.Nano
+		// see also https://en.wikibooks.org/wiki/Serial_Programming/termios#Basic_Configuration_of_a_Serial_Interface
+
+		// no line processing
+		// canonical mode off, echo off, echo newline off, extended input processing off
+		Attributes newAttr = new Attributes(prevAttributes);
+		newAttr.setLocalFlags(EnumSet.of(LocalFlag.ICANON, LocalFlag.ECHO, LocalFlag.IEXTEN), false);
+		// turn off input processing
+		newAttr.setInputFlags(EnumSet.of(Attributes.InputFlag.IXON, Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR), false);
+		// one input byte is enough to return from read, inter-character timer off
+		newAttr.setControlChar(Attributes.ControlChar.VMIN, 1);
+		newAttr.setControlChar(Attributes.ControlChar.VTIME, 0);
+		newAttr.setControlChar(Attributes.ControlChar.VINTR, 0);
+		terminal.setAttributes(newAttr);
+
+		final Map<Signal, SignalHandler> prevSignals = new HashMap<>();
+		prevSignals.put(Signal.WINCH, terminal.handle(Signal.WINCH, this::handleSignal));
+		prevSignals.put(Signal.INT, terminal.handle(Signal.INT, this::handleSignal));
+		prevSignals.put(Signal.QUIT, terminal.handle(Signal.QUIT, this::handleSignal));
+
+		return Tuple2.of(prevAttributes, prevSignals);
+	}
+
+	private void restoreTerminal(Tuple2<Attributes, Map<Signal, SignalHandler>> prev) {
+		final Terminal terminal = client.getTerminal();
+
+		terminal.setAttributes(prev.f0);
+		prev.f1.forEach(terminal::handle);
+	}
+
+	private void unsetTerminalFullScreen() {
+		final Terminal terminal = client.getTerminal();
+
+		terminal.puts(Capability.exit_ca_mode);
+		terminal.puts(Capability.keypad_local);
+		terminal.puts(Capability.cursor_visible);
+	}
+
+	private int computeTotalMainWidth() {
+		final List<AttributedString> mainLines = getMainLines();
+		final List<AttributedString> mainHeaderLines = getMainHeaderLines();
+		final int max1 = mainLines.stream().mapToInt(AttributedString::length).max().orElse(0);
+		final int max2 = mainHeaderLines.stream().mapToInt(AttributedString::length).max().orElse(0);
+		return Math.max(max1, max2);
+	}
+
+	private AttributedString computeTitleLine() {
+		final String title = getTitle();
+		final AttributedStringBuilder titleLine = new AttributedStringBuilder();
+		titleLine.style(AttributedStyle.INVERSE);
+		final int totalMargin = width - title.length();
+		final int margin = totalMargin / 2;
+		repeatChar(titleLine, ' ', margin);
+		titleLine.append(title);
+		repeatChar(titleLine, ' ', margin + (totalMargin % 2));
+		return titleLine.toAttributedString();
+	}
+
+	private void handleSignal(Signal signal) {
+		synchronized (this) {
+			switch (signal) {
+				case INT:
+					close(new SqlExecutionException("Forced interrupt."));
+					break;
+				case QUIT:
+					close(new SqlExecutionException("Forced cancellation."));
+					break;
+				case WINCH:
+					updateSize();
+					if (isRunning) {
+						display();
+					}
+					break;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Starts threads if necessary.
+	 */
+	protected abstract void init();
+
+	protected abstract KeyMap<OP> getKeys();
+
+	protected abstract void evaluate(OP operation, String binding);
+
+	protected abstract String getTitle();
+
+	protected abstract List<AttributedString> computeHeaderLines();
+
+	protected abstract List<AttributedString> computeMainHeaderLines();
+
+	protected abstract List<AttributedString> computeMainLines();
+
+	protected abstract List<AttributedString> computeFooterLines();
+
+	protected abstract void cleanUp();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
new file mode 100644
index 0000000..214a17d
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+/**
+ * Simple parser for determining the type of command and its parameters.
+ */
+public final class SqlCommandParser {
+
+	private SqlCommandParser() {
+		// private
+	}
+
+	public static SqlCommandCall parse(String stmt) {
+		String trimmed = stmt.trim();
+		// remove ';' at the end because many people type it intuitively
+		if (trimmed.endsWith(";")) {
+			trimmed = trimmed.substring(0, trimmed.length() - 1);
+		}
+		for (SqlCommand cmd : SqlCommand.values()) {
+			int pos = 0;
+			int tokenCount = 0;
+			for (String token : trimmed.split("\\s")) {
+				pos += token.length() + 1; // include space character
+				// check for content
+				if (token.length() > 0) {
+					// match
+					if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
+						if (tokenCount == cmd.tokens.length - 1) {
+							return new SqlCommandCall(
+								cmd,
+								splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
+							);
+						}
+					} else {
+						// next sql command
+						break;
+					}
+					tokenCount++; // check next token
+				}
+			}
+		}
+		return null;
+	}
+
+	private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
+		switch (cmd) {
+			case SET:
+				final int delimiter = operands.indexOf('=');
+				if (delimiter < 0) {
+					return new String[] {};
+				} else {
+					return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
+				}
+			case SELECT:
+				return new String[] {originalCall};
+			default:
+				return new String[] {operands};
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Supported SQL commands.
+	 */
+	enum SqlCommand {
+		QUIT("quit"),
+		EXIT("exit"),
+		CLEAR("clear"),
+		HELP("help"),
+		SHOW_TABLES("show tables"),
+		DESCRIBE("describe"),
+		EXPLAIN("explain"),
+		SELECT("select"),
+		SET("set"),
+		RESET("reset"),
+		SOURCE("source");
+
+		public final String command;
+		public final String[] tokens;
+
+		SqlCommand(String command) {
+			this.command = command;
+			this.tokens = command.split(" ");
+		}
+
+		@Override
+		public String toString() {
+			return command.toUpperCase();
+		}
+	}
+
+	/**
+	 * Call of SQL command with operands and command type.
+	 */
+	public static class SqlCommandCall {
+		public final SqlCommand command;
+		public final String[] operands;
+
+		public SqlCommandCall(SqlCommand command, String[] operands) {
+			this.command = command;
+			this.operands = operands;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
new file mode 100644
index 0000000..87201a6
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.IOContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Auxiliary functions for configuration file handling.
+ */
+public class ConfigUtil {
+
+	private ConfigUtil() {
+		// private
+	}
+
+	/**
+	 * Normalizes key-value properties from Yaml in the normalized format of the Table API.
+	 */
+	public static Map<String, String> normalizeYaml(Map<String, Object> yamlMap) {
+		final Map<String, String> normalized = new HashMap<>();
+		yamlMap.forEach((k, v) -> normalizeYamlObject(normalized, k, v));
+		return normalized;
+	}
+
+	private static void normalizeYamlObject(Map<String, String> normalized, String key, Object value) {
+		if (value instanceof Map) {
+			final Map<?, ?> map = (Map<?, ?>) value;
+			map.forEach((k, v) -> normalizeYamlObject(normalized, key + "." + k, v));
+		} else if (value instanceof List) {
+			final List<?> list = (List<?>) value;
+			for (int i = 0; i < list.size(); i++) {
+				normalizeYamlObject(normalized, key + "." + i, list.get(i));
+			}
+		} else {
+			normalized.put(key, value.toString());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Modified object mapper that converts to lower-case keys.
+	 */
+	public static class LowerCaseYamlMapper extends ObjectMapper {
+		public LowerCaseYamlMapper() {
+			super(new YAMLFactory() {
+				@Override
+				protected YAMLParser _createParser(InputStream in, IOContext ctxt) throws IOException {
+					final Reader r = _createReader(in, null, ctxt);
+					// normalize all key to lower case keys
+					return new YAMLParser(ctxt, _getBufferRecycler(), _parserFeatures, _yamlParserFeatures, _objectCodec, r) {
+						@Override
+						public String getCurrentName() throws IOException {
+							if (_currToken == JsonToken.FIELD_NAME) {
+								return _currentFieldName.toLowerCase();
+							}
+							return super.getCurrentName();
+						}
+
+						@Override
+						public String getText() throws IOException {
+							if (_currToken == JsonToken.FIELD_NAME) {
+								return _currentFieldName.toLowerCase();
+							}
+							return super.getText();
+						}
+					};
+				}
+			});
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
new file mode 100644
index 0000000..a87f2e4
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration of a Flink cluster deployment. This class parses the `deployment` part
+ * in an environment file. In the future, we should keep the amount of properties here little and
+ * forward most properties to Flink's CLI frontend properties directly.
+ */
+public class Deployment {
+
+	private final Map<String, String> properties;
+
+	public Deployment() {
+		this.properties = Collections.emptyMap();
+	}
+
+	private Deployment(Map<String, String> properties) {
+		this.properties = properties;
+	}
+
+	public boolean isStandaloneDeployment() {
+		return Objects.equals(
+			properties.getOrDefault(PropertyStrings.DEPLOYMENT_TYPE, PropertyStrings.DEPLOYMENT_TYPE_VALUE_STANDALONE),
+			PropertyStrings.DEPLOYMENT_TYPE_VALUE_STANDALONE);
+	}
+
+	public long getResponseTimeout() {
+		return Long.parseLong(properties.getOrDefault(PropertyStrings.DEPLOYMENT_RESPONSE_TIMEOUT, Long.toString(10000)));
+	}
+
+	public String getGatewayAddress() {
+		return properties.getOrDefault(PropertyStrings.DEPLOYMENT_GATEWAY_ADDRESS, "");
+	}
+
+	public int getGatewayPort() {
+		return Integer.parseInt(properties.getOrDefault(PropertyStrings.DEPLOYMENT_GATEWAY_PORT, Integer.toString(0)));
+	}
+
+	public Map<String, String> toProperties() {
+		final Map<String, String> copy = new HashMap<>();
+		properties.forEach((k, v) -> copy.put(PropertyStrings.DEPLOYMENT + "." + k, v));
+		return copy;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static Deployment create(Map<String, Object> config) {
+		return new Deployment(ConfigUtil.normalizeYaml(config));
+	}
+
+	/**
+	 * Merges two deployments. The properties of the first deployment might be overwritten by the second one.
+	 */
+	public static Deployment merge(Deployment deploy1, Deployment deploy2) {
+		final Map<String, String> properties = new HashMap<>(deploy1.properties);
+		properties.putAll(deploy2.properties);
+
+		return new Deployment(properties);
+	}
+
+	/**
+	 * Creates a new deployment enriched with additional properties.
+	 */
+	public static Deployment enrich(Deployment deploy, Map<String, String> properties) {
+		final Map<String, String> newProperties = new HashMap<>(deploy.properties);
+		properties.forEach((k, v) -> {
+			final String normalizedKey = k.toLowerCase();
+			if (k.startsWith(PropertyStrings.DEPLOYMENT + ".")) {
+				newProperties.put(normalizedKey.substring(PropertyStrings.DEPLOYMENT.length() + 1), v);
+			}
+		});
+
+		return new Deployment(newProperties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
new file mode 100644
index 0000000..a910c49
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Environment configuration that represents the content of an environment file. Environment files
+ * define sources, execution, and deployment behavior. An environment might be defined by default or
+ * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
+ *
+ * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
+ * allow overwriting of a deployment by a session.
+ */
+public class Environment {
+
+	private Map<String, Source> sources;
+
+	private Execution execution;
+
+	private Deployment deployment;
+
+	public Environment() {
+		this.sources = Collections.emptyMap();
+		this.execution = new Execution();
+		this.deployment = new Deployment();
+	}
+
+	public Map<String, Source> getSources() {
+		return sources;
+	}
+
+	public void setSources(List<Map<String, Object>> sources) {
+		this.sources = new HashMap<>(sources.size());
+		sources.forEach(config -> {
+			final Source s = Source.create(config);
+			if (this.sources.containsKey(s.getName())) {
+				throw new SqlClientException("Duplicate source name '" + s + "'.");
+			}
+			this.sources.put(s.getName(), s);
+		});
+	}
+
+	public void setExecution(Map<String, Object> config) {
+		this.execution = Execution.create(config);
+	}
+
+	public Execution getExecution() {
+		return execution;
+	}
+
+	public void setDeployment(Map<String, Object> config) {
+		this.deployment = Deployment.create(config);
+	}
+
+	public Deployment getDeployment() {
+		return deployment;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Parses an environment file from an URL.
+	 */
+	public static Environment parse(URL url) throws IOException {
+		return new ConfigUtil.LowerCaseYamlMapper().readValue(url, Environment.class);
+	}
+
+	/**
+	 * Parses an environment file from an String.
+	 */
+	public static Environment parse(String content) throws IOException {
+		return new ConfigUtil.LowerCaseYamlMapper().readValue(content, Environment.class);
+	}
+
+	/**
+	 * Merges two environments. The properties of the first environment might be overwritten by the second one.
+	 */
+	public static Environment merge(Environment env1, Environment env2) {
+		final Environment mergedEnv = new Environment();
+
+		// merge sources
+		final Map<String, Source> sources = new HashMap<>(env1.getSources());
+		mergedEnv.getSources().putAll(env2.getSources());
+		mergedEnv.sources = sources;
+
+		// merge execution properties
+		mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution());
+
+		// merge deployment properties
+		mergedEnv.deployment = Deployment.merge(env1.getDeployment(), env2.getDeployment());
+
+		return mergedEnv;
+	}
+
+	public static Environment enrich(Environment env, Map<String, String> properties) {
+		final Environment enrichedEnv = new Environment();
+
+		// merge sources
+		enrichedEnv.sources = new HashMap<>(env.getSources());
+
+		// enrich execution properties
+		enrichedEnv.execution = Execution.enrich(env.execution, properties);
+
+		// enrich deployment properties
+		enrichedEnv.deployment = Deployment.enrich(env.deployment, properties);
+
+		return enrichedEnv;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
new file mode 100644
index 0000000..37d1a34
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration of a table program execution. This class parses the `execution` part
+ * in an environment file. The execution describes properties that would usually be defined in the
+ * ExecutionEnvironment/StreamExecutionEnvironment/TableEnvironment or as code in a Flink job.
+ */
+public class Execution {
+
+	private final Map<String, String> properties;
+
+	public Execution() {
+		this.properties = Collections.emptyMap();
+	}
+
+	private Execution(Map<String, String> properties) {
+		this.properties = properties;
+	}
+
+	// TODO add logger warnings if default value is used
+
+	public boolean isStreamingExecution() {
+		return Objects.equals(
+			properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
+			PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING);
+	}
+
+	public boolean isBatchExecution() {
+		return Objects.equals(
+			properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
+			PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
+	}
+
+	public long getMinStateRetention() {
+		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+	}
+
+	public long getMaxStateRetention() {
+		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+	}
+
+	public int getParallelism() {
+		return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_PARALLELISM, Integer.toString(1)));
+	}
+
+	public int getMaxParallelism() {
+		return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
+	}
+
+	public boolean isChangelogMode() {
+		return Objects.equals(
+			properties.getOrDefault(PropertyStrings.EXECUTION_RESULT_MODE, PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG),
+			PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG);
+	}
+
+	public Map<String, String> toProperties() {
+		final Map<String, String> copy = new HashMap<>();
+		properties.forEach((k, v) -> copy.put(PropertyStrings.EXECUTION + "." + k, v));
+		return copy;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static Execution create(Map<String, Object> config) {
+		return new Execution(ConfigUtil.normalizeYaml(config));
+	}
+
+	/**
+	 * Merges two executions. The properties of the first execution might be overwritten by the second one.
+	 */
+	public static Execution merge(Execution exec1, Execution exec2) {
+		final Map<String, String> newProperties = new HashMap<>(exec1.properties);
+		newProperties.putAll(exec2.properties);
+
+		return new Execution(newProperties);
+	}
+
+	/**
+	 * Creates a new execution enriched with additional properties.
+	 */
+	public static Execution enrich(Execution exec, Map<String, String> properties) {
+		final Map<String, String> newProperties = new HashMap<>(exec.properties);
+		properties.forEach((k, v) -> {
+			final String normalizedKey = k.toLowerCase();
+			if (k.startsWith(PropertyStrings.EXECUTION + ".")) {
+				newProperties.put(normalizedKey.substring(PropertyStrings.EXECUTION.length() + 1), v);
+			}
+		});
+
+		return new Execution(newProperties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
new file mode 100644
index 0000000..ba0759d
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+	private PropertyStrings() {
+		// private
+	}
+
+	public static final String EXECUTION = "execution";
+
+	public static final String EXECUTION_TYPE = "type";
+
+	public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+	public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+	public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention";
+
+	public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention";
+
+	public static final String EXECUTION_PARALLELISM = "parallelism";
+
+	public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism";
+
+	public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+	public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog";
+
+	public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+	public static final String DEPLOYMENT = "deployment";
+
+	public static final String DEPLOYMENT_TYPE = "type";
+
+	public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = "standalone";
+
+	public static final String DEPLOYMENT_RESPONSE_TIMEOUT = "response-timeout";
+
+	public static final String DEPLOYMENT_GATEWAY_ADDRESS = "gateway-address";
+
+	public static final String DEPLOYMENT_GATEWAY_PORT = "gateway-port";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
new file mode 100644
index 0000000..7b2498f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableSourceDescriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration of a table source. Parses an entry in the `sources` list of an environment
+ * file and translates to table descriptor properties.
+ */
+public class Source extends TableSourceDescriptor {
+
+	private String name;
+	private Map<String, String> properties;
+
+	private static final String NAME = "name";
+
+	private Source(String name, Map<String, String> properties) {
+		this.name = name;
+		this.properties = properties;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public Map<String, String> getProperties() {
+		return properties;
+	}
+
+	public static Source create(Map<String, Object> config) {
+		if (!config.containsKey(NAME)) {
+			throw new SqlClientException("The 'name' attribute of a table source is missing.");
+		}
+		final Object name = config.get(NAME);
+		if (name == null || !(name instanceof String) || ((String) name).length() <= 0) {
+			throw new SqlClientException("Invalid table source name '" + name + "'.");
+		}
+		final Map<String, Object> properties = new HashMap<>(config);
+		properties.remove(NAME);
+		return new Source((String) name, ConfigUtil.normalizeYaml(properties));
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void addProperties(DescriptorProperties properties) {
+		this.properties.forEach(properties::putString);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
new file mode 100644
index 0000000..512c194
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A gateway for communicating with Flink and other external systems.
+ */
+public interface Executor {
+
+	/**
+	 * Starts the executor and ensures that its is ready for commands to be executed.
+	 */
+	void start() throws SqlExecutionException;
+
+	/**
+	 * Lists all session properties that are defined by the executor and the session.
+	 */
+	Map<String, String> getSessionProperties(SessionContext context) throws SqlExecutionException;
+
+	/**
+	 * Lists all tables known to the executor.
+	 */
+	List<String> listTables(SessionContext context) throws SqlExecutionException;
+
+	/**
+	 * Returns the schema of a table. Throws an exception if the table could not be found.
+	 */
+	TableSchema getTableSchema(SessionContext context, String name) throws SqlExecutionException;
+
+	/**
+	 * Returns a string-based explanation about AST and execution plan of the given statement.
+	 */
+	String explainStatement(SessionContext context, String statement) throws SqlExecutionException;
+
+	/**
+	 * Submits a Flink job (detached) and returns the result descriptor.
+	 */
+	ResultDescriptor executeQuery(SessionContext context, String query) throws SqlExecutionException;
+
+	/**
+	 * Asks for the next changelog results (non-blocking).
+	 */
+	TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext context, String resultId) throws SqlExecutionException;
+
+	/**
+	 * Creates an immutable result snapshot of the running Flink job. Throws an exception if no Flink job can be found.
+	 * Returns the number of pages.
+	 */
+	TypedResult<Integer> snapshotResult(SessionContext context, String resultId, int pageSize) throws SqlExecutionException;
+
+	/**
+	 * Returns the rows that are part of the current page or throws an exception if the snapshot has been expired.
+	 */
+	List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
+
+	/**
+	 * Cancels a table program and stops the result retrieval.
+	 */
+	void cancelQuery(SessionContext context, String resultId) throws SqlExecutionException;
+
+	/**
+	 * Stops the executor.
+	 */
+	void stop(SessionContext context);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
new file mode 100644
index 0000000..3cfaa6f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.table.api.TableSchema;
+
+/**
+ * Describes a result to be expected from a table program.
+ */
+public class ResultDescriptor {
+
+	private final String resultId;
+
+	private final TableSchema resultSchema;
+
+	private final boolean isMaterialized;
+
+	public ResultDescriptor(String resultId, TableSchema resultSchema, boolean isMaterialized) {
+		this.resultId = resultId;
+		this.resultSchema = resultSchema;
+		this.isMaterialized = isMaterialized;
+	}
+
+	public String getResultId() {
+		return resultId;
+	}
+
+	public TableSchema getResultSchema() {
+		return resultSchema;
+	}
+
+	public boolean isMaterialized() {
+		return isMaterialized;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
new file mode 100644
index 0000000..1058eb6
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.table.client.config.Environment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Context describing a session.
+ */
+public class SessionContext {
+
+	private final String name;
+	private final Environment defaultEnvironment;
+	private final Map<String, String> sessionProperties;
+
+	public SessionContext(String name, Environment defaultEnvironment) {
+		this.name = name;
+		this.defaultEnvironment = defaultEnvironment;
+		this.sessionProperties = new HashMap<>();
+	}
+
+	public void setSessionProperty(String key, String value) {
+		sessionProperties.put(key, value);
+	}
+
+	public void resetSessionProperties() {
+		sessionProperties.clear();
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public Environment getEnvironment() {
+		// enrich with session properties
+		return Environment.enrich(defaultEnvironment, sessionProperties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
new file mode 100644
index 0000000..d14da7f
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SqlExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlExecutionException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public SqlExecutionException(String message) {
+		super(message);
+	}
+
+	public SqlExecutionException(String message, Throwable e) {
+		super(message, e);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/035053cf/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
new file mode 100644
index 0000000..ee4e8d3
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Result with an attached type (actual payload, EOS, etc.).
+ *
+ * @param <P> type of payload
+ */
+public class TypedResult<P> {
+
+	private ResultType type;
+
+	private P payload;
+
+	private TypedResult(ResultType type, P payload) {
+		this.type = type;
+		this.payload = payload;
+	}
+
+	public void setType(ResultType type) {
+		this.type = type;
+	}
+
+	public void setPayload(P payload) {
+		this.payload = payload;
+	}
+
+	public ResultType getType() {
+		return type;
+	}
+
+	public P getPayload() {
+		return payload;
+	}
+
+	@Override
+	public String toString() {
+		return "TypedResult<" + type + ">";
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static <T> TypedResult<T> empty() {
+		return new TypedResult<>(ResultType.EMPTY, null);
+	}
+
+	public static <T> TypedResult<T> payload(T payload) {
+		return new TypedResult<>(ResultType.PAYLOAD, payload);
+	}
+
+	public static <T> TypedResult<T> endOfStream() {
+		return new TypedResult<>(ResultType.EOS, null);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Result types.
+	 */
+	public enum ResultType {
+		PAYLOAD,
+		EMPTY,
+		EOS
+	}
+}


Mime
View raw message