flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twalthr <...@git.apache.org>
Subject [GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Date Thu, 17 May 2018 12:35:17 GMT
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5660#discussion_r188939606
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
---
    @@ -293,59 +339,75 @@ public ResultDescriptor executeQuery(SessionContext session, String
query) throw
     	}
     
     	@Override
    -	public TypedResult<Integer> snapshotResult(SessionContext session, String resultId,
int pageSize) throws SqlExecutionException {
    -		final DynamicResult result = resultStore.getResult(resultId);
    -		if (result == null) {
    -			throw new SqlExecutionException("Could not find a result with result identifier '"
+ resultId + "'.");
    -		}
    -		if (!result.isMaterialized()) {
    -			throw new SqlExecutionException("Invalid result retrieval mode.");
    +	public TypedResult<Integer> snapshotResult(SessionContext session, String resultId,
int
    +			pageSize) throws SqlExecutionException {
    +		if (!resultStore.isStatic(resultId)) {
    +			final DynamicResult result = resultStore.getDynamicResult(resultId);
    +			if (result == null) {
    +				throw new SqlExecutionException("Could not find a result with result identifier '"
+ resultId + "'.");
    +			}
    +			if (!result.isMaterialized()) {
    +				throw new SqlExecutionException("Invalid result retrieval mode.");
    +			}
    +			return ((MaterializedResult) result).snapshot(pageSize);
    +		} else {
    +			StaticResult staticResult = resultStore.getStaticResult(resultId);
    +			return staticResult.snapshot(pageSize);
     		}
    -		return ((MaterializedResult) result).snapshot(pageSize);
     	}
     
     	@Override
    -	public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException
{
    -		final DynamicResult result = resultStore.getResult(resultId);
    -		if (result == null) {
    -			throw new SqlExecutionException("Could not find a result with result identifier '"
+ resultId + "'.");
    -		}
    -		if (!result.isMaterialized()) {
    -			throw new SqlExecutionException("Invalid result retrieval mode.");
    +	public List<Row> retrieveResultPage(String resultId, int page) throws
    +			SqlExecutionException {
    +		if (!resultStore.isStatic(resultId)) {
    +			final DynamicResult result = resultStore.getDynamicResult(resultId);
    +			if (result == null) {
    +				throw new SqlExecutionException("Could not find a result with result identifier '"
+ resultId + "'.");
    +			}
    +			if (!result.isMaterialized()) {
    +				throw new SqlExecutionException("Invalid result retrieval mode.");
    +			}
    +			return ((MaterializedResult) result).retrievePage(page);
    +		} else {
    +			return resultStore.getStaticResult(resultId).retrievePage(page);
     		}
    -		return ((MaterializedResult) result).retrievePage(page);
     	}
     
     	@Override
     	public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException
{
    -		final DynamicResult result = resultStore.getResult(resultId);
    -		if (result == null) {
    -			throw new SqlExecutionException("Could not find a result with result identifier '"
+ resultId + "'.");
    -		}
    +		if (!resultStore.isStatic(resultId)) {
    --- End diff --
    
    Do we really need the distinction between dynamic and state result here? The executor
should actually not matter. It should just kill whatever Flink job is running.


---

Mime
View raw message