tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [07/48] git commit: TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)
Date Tue, 20 May 2014 18:46:05 GMT
TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bc1a3235
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bc1a3235
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bc1a3235

Branch: refs/heads/window_function
Commit: bc1a3235176fd5d09b0093bc5b1ff03cdca63dab
Parents: fe81035
Author: Jihoon Son <jihoonson@apache.org>
Authored: Wed Apr 23 14:15:43 2014 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Wed Apr 23 14:15:43 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +-
 .../tajo/cli/DefaultTajoCliOutputFormatter.java | 183 +++++++++++++++++++
 .../main/java/org/apache/tajo/cli/TajoCli.java  | 173 +++++++-----------
 .../apache/tajo/cli/TajoCliOutputFormatter.java |  98 ++++++++++
 .../org/apache/tajo/client/QueryStatus.java     |   9 +
 .../java/org/apache/tajo/client/TajoClient.java |   4 +-
 tajo-client/src/main/proto/ClientProtos.proto   |   6 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   6 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   5 +-
 .../tajo/master/TajoMasterClientService.java    |   1 +
 .../apache/tajo/master/querymaster/Query.java   |   5 +-
 .../tajo/master/querymaster/QueryInfo.java      |   2 +-
 .../querymaster/QueryMasterManagerService.java  |   6 +-
 .../master/querymaster/QueryMasterTask.java     |  23 ++-
 .../tajo/master/querymaster/QueryUnit.java      |  11 +-
 .../master/querymaster/QueryUnitAttempt.java    |  10 +-
 .../tajo/master/querymaster/SubQuery.java       |   2 +-
 .../tajo/worker/TajoWorkerClientService.java    |  11 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  15 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   3 +-
 .../tajo/cli/TestDefaultCliOutputFormatter.java | 139 ++++++++++++++
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  32 ++--
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |   2 -
 .../org/apache/tajo/rpc/NettyClientBase.java    |  16 +-
 .../apache/tajo/rpc/RemoteCallException.java    |   9 +-
 .../org/apache/tajo/rpc/RpcChannelFactory.java  |   4 +-
 .../apache/tajo/rpc/TajoServiceException.java   |  58 ++++++
 tajo-rpc/src/main/proto/RpcProtos.proto         |   4 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  41 ++++-
 29 files changed, 714 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ce3e4b3..d7a17d3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,7 +5,7 @@ Release 0.9.0 - unreleased
   IMPROVEMENT
 
     TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
-
+    TAJO-734: Arrange TajoCli output message. (hyoungjunkim via jihoon)
 
   SUB TASKS
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
new file mode 100644
index 0000000..d291414
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+
+public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
+  private TajoConf tajoConf;
+  private int printPauseRecords;
+  private boolean printPause;
+  private boolean printErrorTrace;
+
+  @Override
+  public void init(TajoConf tajoConf) {
+    this.tajoConf = tajoConf;
+
+    this.printPause = tajoConf.getBoolVar(TajoConf.ConfVars.CLI_PRINT_PAUSE);
+    this.printPauseRecords = tajoConf.getIntVar(TajoConf.ConfVars.CLI_PRINT_PAUSE_NUM_RECORDS);
+    this.printErrorTrace = tajoConf.getBoolVar(TajoConf.ConfVars.CLI_PRINT_ERROR_TRACE);
+  }
+
+  @Override
+  public void setScirptMode() {
+    this.printPause = false;
+  }
+
+  private String getQuerySuccessMessage(TableDesc tableDesc, float responseTime, int totalPrintedRows, String postfix) {
+    TableStats stat = tableDesc.getStats();
+    String volume = FileUtil.humanReadableByteCount(stat.getNumBytes(), false);
+    long resultRows = stat.getNumRows();
+
+    long realNumRows = resultRows != 0 ? resultRows : totalPrintedRows;
+    return "(" + realNumRows + " rows, " + responseTime + " sec, " + volume + " " + postfix + ")";
+  }
+
+  @Override
+  public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc,
+                          float responseTime, ResultSet res) throws Exception {
+    long resultRows = tableDesc.getStats().getNumRows();
+    if (resultRows == 0) {
+      resultRows = Integer.MAX_VALUE;
+    }
+
+    if (res == null) {
+      sout.println(getQuerySuccessMessage(tableDesc, responseTime, 0, "inserted"));
+      return;
+    }
+    ResultSetMetaData rsmd = res.getMetaData();
+    int numOfColumns = rsmd.getColumnCount();
+    for (int i = 1; i <= numOfColumns; i++) {
+      if (i > 1) sout.print(",  ");
+      String columnName = rsmd.getColumnName(i);
+      sout.print(columnName);
+    }
+    sout.println("\n-------------------------------");
+
+    int numOfPrintedRows = 0;
+    int totalPrintedRows = 0;
+    while (res.next()) {
+      for (int i = 1; i <= numOfColumns; i++) {
+        if (i > 1) sout.print(",  ");
+        String columnValue = res.getObject(i).toString();
+        if(res.wasNull()){
+          sout.print("null");
+        } else {
+          sout.print(columnValue);
+        }
+      }
+      sout.println();
+      sout.flush();
+      numOfPrintedRows++;
+      totalPrintedRows++;
+      if (printPause && printPauseRecords > 0 && totalPrintedRows < resultRows && numOfPrintedRows >= printPauseRecords) {
+        if (resultRows < Integer.MAX_VALUE) {
+          sout.print("(" + totalPrintedRows + "/" + resultRows + " rows, continue... 'q' is quit)");
+        } else {
+          sout.print("(" + totalPrintedRows + " rows, continue... 'q' is quit)");
+        }
+        sout.flush();
+        if (sin != null) {
+          if (sin.read() == 'q') {
+            sout.println();
+            break;
+          }
+        }
+        numOfPrintedRows = 0;
+        sout.println();
+      }
+    }
+    sout.println(getQuerySuccessMessage(tableDesc, responseTime, totalPrintedRows, "selected"));
+  }
+
+  @Override
+  public void printNoResult(PrintWriter sout) {
+    sout.println("(0 rows)");
+  }
+
+  @Override
+  public void printProgress(PrintWriter sout, QueryStatus status) {
+    sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
+        + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
+    sout.flush();
+  }
+
+  @Override
+  public void printMessage(PrintWriter sout, String message) {
+    sout.println(message);
+  }
+
+  @Override
+  public void printErrorMessage(PrintWriter sout, Throwable t) {
+    sout.println(parseErrorMessage(t.getMessage()));
+    if (printErrorTrace) {
+      sout.println(ExceptionUtils.getStackTrace(t));
+    }
+  }
+
+  @Override
+  public void printErrorMessage(PrintWriter sout, String message) {
+    sout.println(parseErrorMessage(message));
+  }
+
+  @Override
+  public void printKilledMessage(PrintWriter sout, QueryId queryId) {
+    sout.println(TajoCli.KILL_PREFIX + queryId);
+  }
+
+  @Override
+  public void printErrorMessage(PrintWriter sout, QueryStatus status) {
+    if (status.getErrorMessage() != null && !status.getErrorMessage().isEmpty()) {
+      printErrorMessage(sout, parseErrorMessage(status.getErrorMessage()));
+    } else {
+      printErrorMessage(sout, "No error message");
+    }
+    if (printErrorTrace && status.getErrorTrace() != null && !status.getErrorTrace().isEmpty()) {
+      sout.println(status.getErrorTrace());
+    }
+  }
+
+  public static String parseErrorMessage(String message) {
+    if (message == null) {
+      return TajoCli.ERROR_PREFIX + "No error message";
+    }
+    String[] lines = message.split("\n");
+    message = lines[0];
+
+    int index = message.lastIndexOf(TajoCli.ERROR_PREFIX);
+    if (index < 0) {
+      message = TajoCli.ERROR_PREFIX + message;
+    } else {
+      message = message.substring(index);
+    }
+
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 508b8bb..606ca88 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -35,7 +35,6 @@ import org.apache.tajo.util.FileUtil;
 import java.io.*;
 import java.lang.reflect.Constructor;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
@@ -47,6 +46,8 @@ import static org.apache.tajo.cli.ParsedResult.StatementType.STATEMENT;
 import static org.apache.tajo.cli.SimpleParser.ParsingState;
 
 public class TajoCli {
+  public static final String ERROR_PREFIX = "ERROR: ";
+  public static final String KILL_PREFIX = "KILL: ";
 
   private final TajoConf conf;
   private TajoClient client;
@@ -61,6 +62,8 @@ public class TajoCli {
   // Current States
   private String currentDatabase;
 
+  private TajoCliOutputFormatter outputFormatter;
+
   private static final Class [] registeredCommands = {
       DescTableCommand.class,
       DescFunctionCommand.class,
@@ -78,7 +81,6 @@ public class TajoCli {
   };
   private final Map<String, TajoShellCommand> commands = new TreeMap<String, TajoShellCommand>();
 
-  public static final int PRINT_LIMIT = 24;
   private static final Options options;
   private static final String HOME_DIR = System.getProperty("user.home");
   private static final String HISTORY_FILE = ".tajo_history";
@@ -121,6 +123,11 @@ public class TajoCli {
     this.reader = new ConsoleReader(sin, out);
     this.reader.setExpandEvents(false);
     this.sout = new PrintWriter(reader.getOutput());
+    Class formatterClass = conf.getClass(conf.getVar(ConfVars.CLI__OUTPUT_FORMATTER_CLASS),
+            DefaultTajoCliOutputFormatter.class);
+
+    this.outputFormatter = (TajoCliOutputFormatter)formatterClass.newInstance();
+    this.outputFormatter.init(conf);
 
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = parser.parse(options, args);
@@ -160,7 +167,7 @@ public class TajoCli {
     }
 
     if ((hostName == null) ^ (port == null)) {
-      System.err.println("ERROR: cannot find valid Tajo server address");
+      System.err.println(ERROR_PREFIX + "cannot find valid Tajo server address");
       throw new RuntimeException("cannot find valid Tajo server address");
     } else if (hostName != null && port != null) {
       conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
@@ -175,11 +182,13 @@ public class TajoCli {
     initCommands();
 
     if (cmd.hasOption("c")) {
+      outputFormatter.setScirptMode();
       executeScript(cmd.getOptionValue("c"));
       sout.flush();
       System.exit(0);
     }
     if (cmd.hasOption("f")) {
+      outputFormatter.setScirptMode();
       File sqlFile = new File(cmd.getOptionValue("f"));
       if (sqlFile.exists()) {
         String script = FileUtil.readTextFile(new File(cmd.getOptionValue("f")));
@@ -187,7 +196,7 @@ public class TajoCli {
         sout.flush();
         System.exit(0);
       } else {
-        System.err.println("No such a file \"" + cmd.getOptionValue("f") + "\"");
+        System.err.println(ERROR_PREFIX + "No such a file \"" + cmd.getOptionValue("f") + "\"");
         System.exit(-1);
       }
     }
@@ -202,10 +211,10 @@ public class TajoCli {
         history = new TajoFileHistory(new File(historyPath));
         reader.setHistory(history);
       } else {
-        System.err.println("ERROR: home directory : '" + HOME_DIR +"' does not exist.");
+        System.err.println(ERROR_PREFIX + "home directory : '" + HOME_DIR +"' does not exist.");
       }
     } catch (Exception e) {
-      System.err.println(e.getMessage());
+      System.err.println(ERROR_PREFIX + e.getMessage());
     }
   }
 
@@ -296,9 +305,9 @@ public class TajoCli {
       try {
         invoked.invoke(arguments);
       } catch (IllegalArgumentException ige) {
-        sout.println(ige.getMessage());
+        outputFormatter.printErrorMessage(sout, ige);
       } catch (Exception e) {
-        sout.println(e.getMessage());
+        outputFormatter.printErrorMessage(sout, e);
       }
     }
 
@@ -306,44 +315,43 @@ public class TajoCli {
   }
 
   private void executeQuery(String statement) throws ServiceException {
+    long startTime = System.currentTimeMillis();
     ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
     if (response == null) {
-      sout.println("response is null");
+      outputFormatter.printErrorMessage(sout, "response is null");
     } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
       if (response.getIsForwarded()) {
         QueryId queryId = new QueryId(response.getQueryId());
-        try {
-          waitForQueryCompleted(queryId);
-        } finally {
-          client.closeQuery(queryId);
-        }
+        waitForQueryCompleted(queryId);
       } else {
         if (!response.hasTableDesc() && !response.hasResultSet()) {
-          sout.println("Ok");
+          outputFormatter.printMessage(sout, "OK");
         } else {
-
-          ResultSet resultSet;
-          int numBytes;
-          long maxRowNum;
-          try {
-            resultSet = TajoClient.createResultSet(client, response);
-            if (response.hasTableDesc()) {
-              numBytes = 0;
-            } else {
-              numBytes = response.getResultSet().getBytesNum();
-            }
-            maxRowNum = response.getMaxRowNum();
-            printResult(resultSet, maxRowNum, numBytes);
-          } catch (IOException ioe) {
-            sout.println(ioe.getMessage());
-          } catch (SQLException sqe) {
-            sout.println(sqe.getMessage());
-          }
+          localQueryCompleted(response, startTime);
         }
       }
     } else {
       if (response.hasErrorMessage()) {
-        sout.println(response.getErrorMessage());
+        outputFormatter.printErrorMessage(sout, response.getErrorMessage());
+      }
+    }
+  }
+
+  private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long startTime) {
+    ResultSet res = null;
+    try {
+      res = TajoClient.createResultSet(client, response);
+      float responseTime = ((float)(System.currentTimeMillis() - startTime) / 1000.0f);
+      TableDesc desc = new TableDesc(response.getTableDesc());
+      outputFormatter.printResult(sout, sin, desc, responseTime, res);
+    } catch (Throwable t) {
+      outputFormatter.printErrorMessage(sout, t);
+    } finally {
+      if (res != null) {
+        try {
+          res.close();
+        } catch (SQLException e) {
+        }
       }
     }
   }
@@ -355,9 +363,10 @@ public class TajoCli {
     }
 
     // query execute
+    ResultSet res = null;
+    QueryStatus status = null;
     try {
 
-      QueryStatus status;
       int initRetries = 0;
       int progressRetries = 0;
       while (true) {
@@ -370,9 +379,7 @@ public class TajoCli {
         }
 
         if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) {
-          sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
-              + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
-          sout.flush();
+          outputFormatter.printProgress(sout, status);
         }
 
         if (status.getState() != QueryState.QUERY_RUNNING &&
@@ -385,91 +392,37 @@ public class TajoCli {
         }
       }
 
-      if (status.getState() == QueryState.QUERY_ERROR) {
-        sout.println("Internal error!");
-        if(status.getErrorMessage() != null && !status.getErrorMessage().isEmpty()) {
-          sout.println(status.getErrorMessage());
-        }
-      } else if (status.getState() == QueryState.QUERY_FAILED) {
-        sout.println("Query failed!");
+      if (status.getState() == QueryState.QUERY_ERROR || status.getState() == QueryState.QUERY_FAILED) {
+        outputFormatter.printErrorMessage(sout, status);
       } else if (status.getState() == QueryState.QUERY_KILLED) {
-        sout.println(queryId + " is killed.");
+        outputFormatter.printKilledMessage(sout, queryId);
       } else {
         if (status.getState() == QueryState.QUERY_SUCCEEDED) {
-          sout.println("final state: " + status.getState()
-              + ", response time: " + (((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0)
-              + " sec"));
+          float responseTime = ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0f);
+          ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId);
           if (status.hasResult()) {
-            ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId);
-            ResultSet res = TajoClient.createResultSet(client, queryId, response);
+            res = TajoClient.createResultSet(client, queryId, response);
             TableDesc desc = new TableDesc(response.getTableDesc());
-            long totalRowNum = desc.getStats().getNumRows();
-            long totalBytes = desc.getStats().getNumBytes();
-            printResult(res, totalRowNum, totalBytes);
+            outputFormatter.printResult(sout, sin, desc, responseTime, res);
           } else {
-            sout.println("OK");
+            TableDesc desc = new TableDesc(response.getTableDesc());
+            outputFormatter.printResult(sout, sin, desc, responseTime, res);
           }
         }
       }
     } catch (Throwable t) {
-      t.printStackTrace();
-      System.err.println(t.getMessage());
-    }
-  }
-
-  private void printResult(ResultSet res, long rowNum, long numBytes) throws IOException, SQLException  {
-    try {
-      if (res == null) {
-        sout.println("OK");
-        return;
-      }
-
-      ResultSetMetaData rsmd = res.getMetaData();
-
-      String volume = FileUtil.humanReadableByteCount(numBytes, false);
-      String rowNumStr = rowNum == Integer.MAX_VALUE ? "unknown" : rowNum + "";
-      sout.println("result: " + rowNumStr + " rows (" + volume + ")");
-
-      int numOfColumns = rsmd.getColumnCount();
-      for (int i = 1; i <= numOfColumns; i++) {
-        if (i > 1) sout.print(",  ");
-        String columnName = rsmd.getColumnName(i);
-        sout.print(columnName);
-      }
-      sout.println("\n-------------------------------");
-
-      int numOfPrintedRows = 0;
-      while (res.next()) {
-        // TODO - to be improved to print more formatted text
-        for (int i = 1; i <= numOfColumns; i++) {
-          if (i > 1) sout.print(",  ");
-          String columnValue = res.getObject(i).toString();
-          if(res.wasNull()){
-            sout.print("null");
-          } else {
-            sout.print(columnValue);
-          }
+      outputFormatter.printErrorMessage(sout, t);
+    } finally {
+      if (res != null) {
+        try {
+          res.close();
+        } catch (SQLException e) {
         }
-        sout.println();
-        sout.flush();
-        numOfPrintedRows++;
-        if (numOfPrintedRows >= PRINT_LIMIT) {
-          sout.print("continue... ('q' is quit)");
-          sout.flush();
-          if (sin.read() == 'q') {
-            sout.println();
-            break;
-          }
-          numOfPrintedRows = 0;
-          sout.println();
+      } else {
+        if (status != null && status.getQueryId() != null) {
+          client.closeQuery(status.getQueryId());
         }
       }
-    } catch (SQLException e) {
-      e.printStackTrace();
-    } finally {
-      if(res != null) {
-        res.close();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
new file mode 100644
index 0000000..0e91669
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCliOutputFormatter.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.sql.ResultSet;
+
+public interface TajoCliOutputFormatter {
+  /**
+   * Initialize formatter
+   * @param tajoConf
+   */
+  public void init(TajoConf tajoConf);
+
+  /**
+   * print query result to console
+   * @param sout
+   * @param sin
+   * @param tableDesc
+   * @param responseTime
+   * @param res
+   * @throws Exception
+   */
+  public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc,
+                          float responseTime, ResultSet res) throws Exception;
+
+  /**
+   * print no result message
+   * @param sout
+   */
+  public void printNoResult(PrintWriter sout);
+
+  /**
+   * print simple message
+   * @param sout
+   * @param message
+   */
+  public void printMessage(PrintWriter sout, String message);
+
+  /**
+   * print query progress message
+   * @param sout
+   * @param status
+   */
+  public void printProgress(PrintWriter sout, QueryStatus status);
+
+  /**
+   * print error message
+   * @param sout
+   * @param t
+   */
+  public void printErrorMessage(PrintWriter sout, Throwable t);
+
+  /**
+   * print error message
+   * @param sout
+   * @param message
+   */
+  public void printErrorMessage(PrintWriter sout, String message);
+
+  /**
+   * print error message
+   * @param sout
+   * @param queryId
+   */
+  public void printKilledMessage(PrintWriter sout, QueryId queryId);
+
+  /**
+   * print query status error message
+   * @param sout
+   * @param status
+   */
+  void printErrorMessage(PrintWriter sout, QueryStatus status);
+
+  void setScirptMode();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
index 203f9aa..4a38934 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java
@@ -30,6 +30,7 @@ public class QueryStatus {
   private long finishTime;
   private boolean hasResult;
   private String errorText;
+  private String errorTrace;
   private String queryMasterHost;
   private int queryMasterPort;
 
@@ -43,6 +44,9 @@ public class QueryStatus {
     if (proto.hasErrorMessage()) {
       errorText = proto.getErrorMessage();
     }
+    if (proto.hasErrorTrace()) {
+      errorTrace = proto.getErrorTrace();
+    }
 
     queryMasterHost = proto.getQueryMasterHost();
     queryMasterPort = proto.getQueryMasterPort();
@@ -83,4 +87,9 @@ public class QueryStatus {
   public String getErrorMessage() {
     return errorText;
   }
+
+  public String getErrorTrace() {
+    return errorTrace;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 81fc227..3c85662 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -180,7 +180,9 @@ public class TajoClient implements Closeable {
       CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
       if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) {
         sessionId = response.getSessionId();
-        LOG.info(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+        }
       } else {
         throw new InvalidClientSessionException(response.getMessage());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 3baeee9..bdc271e 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -134,6 +134,7 @@ message SubmitQueryResponse {
   optional int32 maxRowNum = 9;
 
   optional string errorMessage = 10;
+  optional string errorTrace = 11;
 }
 
 message GetQueryStatusResponse {
@@ -145,8 +146,9 @@ message GetQueryStatusResponse {
   optional int64 finishTime = 7;
   optional bool hasResult = 8;
   optional string errorMessage = 9;
-  optional string queryMasterHost = 10;
-  optional int32 queryMasterPort = 11;
+  optional string errorTrace = 10;
+  optional string queryMasterHost = 11;
+  optional int32 queryMasterPort = 12;
 }
 
 message GetClusterInfoRequest {

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5b3d4b3..552f1e4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -245,7 +245,11 @@ public class TajoConf extends Configuration {
     METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"),
 
     //CLI
-    CLI_MAX_COLUMN("tajo.cli.max_columns", 120)
+    CLI_MAX_COLUMN("tajo.cli.max_columns", 120),
+    CLI_PRINT_PAUSE_NUM_RECORDS("tajo.cli.print.pause.num.records", 100),
+    CLI_PRINT_PAUSE("tajo.cli.print.pause", true),
+    CLI_PRINT_ERROR_TRACE("tajo.cli.print.error.trace", true),
+    CLI__OUTPUT_FORMATTER_CLASS("tajo.cli.otuptu.formatter", "org.apache.tajo.cli.DefaultTajoCliOutputFormatter");
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8acf2b2..35b8ab8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -34,7 +34,6 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
@@ -60,7 +59,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
@@ -156,9 +154,10 @@ public class GlobalEngine extends AbstractService {
       responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
       String errorMessage = t.getMessage();
       if (t.getMessage() == null) {
-        errorMessage = StringUtils.stringifyException(t);
+        errorMessage = t.getClass().getName();
       }
       responseBuilder.setErrorMessage(errorMessage);
+      responseBuilder.setErrorTrace(StringUtils.stringifyException(t));
       return responseBuilder.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c968a73..c6facb1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -481,6 +481,7 @@ public class TajoMasterClientService extends AbstractService {
             new QueryInfo(queryId)));
         return BOOL_TRUE;
       } catch (Throwable t) {
+        t.printStackTrace();
         throw new ServiceException(t);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a8f5b31..6a5248d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -164,8 +164,9 @@ public class Query implements EventHandler<QueryEvent> {
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able transitions
-          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
-              EnumSet.of(QueryEventType.KILL))
+          .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+              QueryEventType.KILL,
+              QUERY_COMPLETED_TRANSITION)
 
           // Transitions from FAILED state
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index b077b36..9e455ae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -119,7 +119,7 @@ public class QueryInfo {
 
   @Override
   public String toString() {
-    return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster="
+    return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
         + getQueryMasterHost();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index bf59e9f..43c85d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -186,7 +186,11 @@ public class QueryMasterManagerService extends CompositeService
     try {
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
           new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+      if (queryMasterTask != null) {
+        queryMasterTask.handleTaskFailed(report);
+      } else {
+        LOG.warn("No QueryMasterTask: " + new QueryUnitAttemptId(report.getId()));
+      }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 39ea430..4a14359 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -48,6 +48,7 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoContainerProxy;
@@ -64,8 +65,7 @@ import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -115,6 +115,9 @@ public class QueryMasterTask extends CompositeService {
 
   private Throwable initError;
 
+  private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
+      new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
                          QueryId queryId, Session session, QueryContext queryContext, String sql,
                          String logicalPlanJson) {
@@ -218,6 +221,22 @@ public class QueryMasterTask extends CompositeService {
     query.getSubQuery(id).handleTaskRequestEvent(event);
   }
 
+  public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
+    synchronized(diagnostics) {
+      if (diagnostics.size() < 10) {
+        diagnostics.add(report);
+      }
+    }
+
+    getEventHandler().handle(new TaskFatalErrorEvent(report));
+  }
+
+  public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+    synchronized(diagnostics) {
+      return Collections.unmodifiableCollection(diagnostics);
+    }
+  }
+
   private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
     public void handle(SubQueryEvent event) {
       ExecutionBlockId id = event.getSubQueryId();

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 42fbf8a..34686da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -549,13 +549,16 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     @Override
     public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
-      LOG.info("=============================================================");
-      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
-      LOG.info("=============================================================");
       task.failedAttempts++;
       task.finishedAttempts++;
+      boolean retry = task.failedAttempts < task.maxAttempts;
+
+      LOG.info("====================================================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
+          "retry:" + retry + ", attempts:" +  task.failedAttempts + " <<<");
+      LOG.info("====================================================================================");
 
-      if (task.failedAttempts < task.maxAttempts) {
+      if (retry) {
         if (task.successfulAttempt == null) {
           task.addAndScheduleAttempt();
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 7993ce9..b69742c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -97,6 +98,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
           TaskAttemptEventType.TA_KILL,
           new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_ASSIGNED,
           EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
@@ -155,6 +159,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
           EnumSet.of(
               TaskAttemptEventType.TA_UPDATE))
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
 
       .installTopology();
 
@@ -383,6 +390,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
         taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
       } catch (Throwable t) {
         taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+        taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
       }
     }
   }
@@ -402,7 +410,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
       taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
-      LOG.error("FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 8929e8d..31c0efa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -1057,7 +1057,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
         }
 
-        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
             subQuery.getId(),
             subQuery.getTotalScheduledObjectsCount(),
             subQuery.succeededObjectCount,

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 937d886..2b947fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -33,6 +33,7 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
@@ -41,6 +42,7 @@ import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 
 public class TajoWorkerClientService extends AbstractService {
   private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
@@ -201,9 +203,12 @@ public class TajoWorkerClientService extends AbstractService {
           } else {
             builder.setFinishTime(System.currentTimeMillis());
           }
-        } else {
-          builder.setState(queryMasterTask.getState());
-          builder.setErrorMessage(queryMasterTask.getErrorMessage());
+        } 
+        Collection<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = queryMasterTask.getDiagnostics();
+        if(!diagnostics.isEmpty()) {
+          TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next();
+          builder.setErrorMessage(firstError.getErrorMessage());
+          builder.setErrorTrace(firstError.getErrorTrace());
         }
       }
       return builder.build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 30f56ee..ef52fd0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -365,7 +365,7 @@ public class Task {
 
   public void run() {
     startTime = System.currentTimeMillis();
-    String errorMessage = null;
+    Exception error = null;
     try {
       context.setState(TaskAttemptState.TA_RUNNING);
 
@@ -381,6 +381,7 @@ public class Task {
         this.executor = taskRunnerContext.getTQueryEngine().
             createPlan(context, plan);
         this.executor.init();
+
         while(!killed && executor.next() != null) {
         }
         this.executor.close();
@@ -388,9 +389,8 @@ public class Task {
         this.executor = null;
       }
     } catch (Exception e) {
-      // errorMessage will be sent to master.
-      errorMessage = ExceptionUtils.getStackTrace(e);
-      LOG.error(errorMessage);
+      error = e ;
+      LOG.error(e.getMessage(), e);
       aborted = true;
     } finally {
       context.setProgress(1.0f);
@@ -409,8 +409,9 @@ public class Task {
           TaskFatalErrorReport.Builder errorBuilder =
               TaskFatalErrorReport.newBuilder()
                   .setId(getId().getProto());
-          if (errorMessage != null) {
-            errorBuilder.setErrorMessage(errorMessage);
+          if (error != null) {
+            errorBuilder.setErrorMessage(error.getMessage());
+            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
           }
 
           masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
@@ -444,7 +445,7 @@ public class Task {
       finishTime = System.currentTimeMillis();
 
       cleanupTask();
-      LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+      LOG.info("Worker's task counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
           + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 7c94e33..78da10f 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -57,7 +57,8 @@ message TaskCompletionReport {
 
 message TaskFatalErrorReport {
   required QueryUnitAttemptIdProto id = 1;
-  optional string error_message = 2;
+  optional string errorMessage = 2;
+  optional string errorTrace = 3;
 }
 
 message QueryUnitRequestProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
new file mode 100644
index 0000000..e13eeef
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestDefaultCliOutputFormatter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Float8Datum;
+import org.apache.tajo.datum.Int4Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.jdbc.MetaDataTuple;
+import org.apache.tajo.jdbc.TajoMetaDataResultSet;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestDefaultCliOutputFormatter {
+  @Test
+  public void testParseErrorMessage() {
+    String message = "java.sql.SQLException: ERROR: no such a table: table1";
+    assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(message));
+
+    String multiLineMessage =
+        "ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" +
+        "com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table: table1\n" +
+        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n" +
+        "\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" +
+        "\tat org.apache.tajo.cli.DescTableCommand.invoke(DescTableCommand.java:43)\n" +
+        "\tat org.apache.tajo.cli.TajoCli.executeMetaCommand(TajoCli.java:300)\n" +
+        "\tat org.apache.tajo.cli.TajoCli.executeParsedResults(TajoCli.java:280)\n" +
+        "\tat org.apache.tajo.cli.TajoCli.runShell(TajoCli.java:271)\n" +
+        "\tat org.apache.tajo.cli.TajoCli.main(TajoCli.java:420)\n" +
+        "Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" +
+        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" +
+        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" +
+        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" +
+        "\t... 6 more";
+
+    assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage));
+  }
+
+  @Test
+  public void testPrintResultInsertStatement() throws Exception {
+    TajoConf tajoConf = new TajoConf();
+    DefaultTajoCliOutputFormatter outputFormatter = new DefaultTajoCliOutputFormatter();
+    outputFormatter.init(tajoConf);
+
+    float responseTime = 10.1f;
+    long numBytes = 102;
+    long numRows = 30;
+
+    TableDesc tableDesc = new TableDesc();
+    TableStats stats = new TableStats();
+    stats.setNumBytes(102);
+    stats.setNumRows(numRows);
+    tableDesc.setStats(stats);
+
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter writer = new PrintWriter(stringWriter);
+    outputFormatter.printResult(writer, null, tableDesc, responseTime, null);
+
+    String expectedOutput = "(" +  numRows + " rows, " + responseTime + " sec, " + numBytes + " B inserted)\n";
+    assertEquals(expectedOutput, stringWriter.toString());
+  }
+
+  @Test
+  public void testPrintResultSelectStatement() throws Exception {
+    TajoConf tajoConf = new TajoConf();
+    DefaultTajoCliOutputFormatter outputFormatter = new DefaultTajoCliOutputFormatter();
+    outputFormatter.init(tajoConf);
+
+    float responseTime = 10.1f;
+    long numBytes = 102;
+    long numRows = 30;
+
+    TableDesc tableDesc = new TableDesc();
+    TableStats stats = new TableStats();
+    stats.setNumBytes(102);
+    stats.setNumRows(numRows);
+    tableDesc.setStats(stats);
+
+    final List<MetaDataTuple> resultTables = new ArrayList<MetaDataTuple>();
+
+    String expectedOutput = "col1,  col2,  col3\n";
+    expectedOutput += "-------------------------------\n";
+
+    String prefix = "";
+    for (int i = 0; i < numRows; i++) {
+      MetaDataTuple tuple = new MetaDataTuple(3);
+
+      int index = 0;
+
+      tuple.put(index++, new TextDatum("row_" + i));
+      tuple.put(index++, new Int4Datum(i));
+      tuple.put(index++, new Float8Datum(i));
+
+      expectedOutput += prefix + "row_" + i + ",  " + (new Int4Datum(i)) + ",  " + (new Float8Datum(i));
+      prefix = "\n";
+      resultTables.add(tuple);
+    }
+    expectedOutput += "\n(" +  numRows + " rows, " + responseTime + " sec, " + numBytes + " B selected)\n";
+
+    ResultSet resultSet = new TajoMetaDataResultSet(
+        Arrays.asList("col1", "col2", "col3"),
+        Arrays.asList(TajoDataTypes.Type.TEXT, TajoDataTypes.Type.INT4, TajoDataTypes.Type.FLOAT8),
+        resultTables);
+
+    StringWriter stringWriter = new StringWriter();
+    PrintWriter writer = new PrintWriter(stringWriter);
+    outputFormatter.printResult(writer, null, tableDesc, responseTime, resultSet);
+
+    assertEquals(expectedOutput, stringWriter.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 604ed52..3d6989a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -117,11 +117,12 @@ public class BlockingRpcClient extends NettyClientBase {
       }
     }
 
+    @Override
     public Message callBlockingMethod(final MethodDescriptor method,
                                       final RpcController controller,
                                       final Message param,
                                       final Message responsePrototype)
-        throws ServiceException {
+        throws TajoServiceException {
 
       int nextSeqId = sequence.getAndIncrement();
 
@@ -135,12 +136,13 @@ public class BlockingRpcClient extends NettyClientBase {
       try {
         return callFuture.get();
       } catch (Throwable t) {
-        if(t instanceof ExecutionException) {
-          ExecutionException ee = (ExecutionException)t;
-          throw new ServiceException(ee.getCause());
-        } else {
-          throw new RemoteException(t);
+        if (t instanceof ExecutionException) {
+          Throwable cause = t.getCause();
+          if (cause != null && cause instanceof TajoServiceException) {
+            throw (TajoServiceException)cause;
+          }
         }
+        throw new TajoServiceException(t.getMessage());
       }
     }
 
@@ -161,14 +163,23 @@ public class BlockingRpcClient extends NettyClientBase {
 
   private String getErrorMessage(String message) {
     if(protocol != null && getChannel() != null) {
-      return "Exception [" + protocol.getCanonicalName() +
+      return protocol.getName() +
           "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)
-          getChannel().getRemoteAddress()) + ")]: " + message;
+          getChannel().getRemoteAddress()) + "): " + message;
     } else {
       return "Exception " + message;
     }
   }
 
+  private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
+    if(protocol != null && getChannel() != null) {
+      return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
+          NetUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+    } else {
+      return new TajoServiceException(response.getErrorMessage());
+    }
+  }
+
   private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
 
     @Override
@@ -183,9 +194,8 @@ public class BlockingRpcClient extends NettyClientBase {
       } else {
         if (rpcResponse.hasErrorMessage()) {
           callback.setFailed(rpcResponse.getErrorMessage(),
-              new ServiceException(getErrorMessage(rpcResponse.getErrorMessage())));
-          throw new RemoteException(
-              getErrorMessage(rpcResponse.getErrorMessage()));
+              makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+          throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
         } else {
           Message responseMessage;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 067d824..9e0d57c 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -125,8 +125,6 @@ public class BlockingRpcServer extends NettyServerBase {
         RemoteCallException callException = (RemoteCallException) e.getCause();
         e.getChannel().write(callException.getResponse());
       }
-
-      throw new RemoteException(e.getCause());
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f49e17..fa4b941 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -57,20 +57,21 @@ public abstract class NettyClientBase implements Closeable {
       this.bootstrap.setOption("keepAlive", true);
 
       connect(addr);
-    } catch (Throwable t) {
+    } catch (IOException e) {
       close();
+      throw e;
+    } catch (Throwable t) {
       throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause());
     }
   }
 
-  public void connect(InetSocketAddress addr) {
+  public void connect(InetSocketAddress addr) throws Exception {
     if(addr.isUnresolved()){
        addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
     }
     this.channelFuture = bootstrap.connect(addr);
     this.channelFuture.awaitUninterruptibly();
     if (!channelFuture.isSuccess()) {
-      channelFuture.getCause().printStackTrace();
       throw new RuntimeException(channelFuture.getCause());
     }
   }
@@ -80,6 +81,9 @@ public abstract class NettyClientBase implements Closeable {
   }
 
   public InetSocketAddress getRemoteAddress() {
+    if (channelFuture == null || channelFuture.getChannel() == null) {
+      return null;
+    }
     return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
   }
 
@@ -100,9 +104,9 @@ public abstract class NettyClientBase implements Closeable {
     if(this.bootstrap != null) {
       // This line will shutdown the factory
       // this.bootstrap.releaseExternalResources();
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Proxy is disconnected from " +
-            getRemoteAddress().getHostName() + ":" + getRemoteAddress().getPort());
+      InetSocketAddress address = getRemoteAddress();
+      if (address != null) {
+        LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 6504e69..949aa58 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -27,11 +27,15 @@ import java.io.Writer;
 
 public class RemoteCallException extends RemoteException {
   private int seqId;
+  private String originExceptionClass;
 
   public RemoteCallException(int seqId, MethodDescriptor methodDesc,
                              Throwable t) {
     super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t);
     this.seqId = seqId;
+    if (t != null) {
+      originExceptionClass = t.getClass().getCanonicalName();
+    }
   }
 
   public RemoteCallException(int seqId, Throwable t) {
@@ -42,7 +46,10 @@ public class RemoteCallException extends RemoteException {
   public RpcResponse getResponse() {
     RpcResponse.Builder builder = RpcResponse.newBuilder();
     builder.setId(seqId);
-    builder.setErrorMessage(getStackTraceString(getCause()));
+    builder.setErrorMessage(getCause().getMessage());
+    builder.setErrorTrace(getStackTraceString(getCause()));
+    builder.setErrorClass(originExceptionClass);
+
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index adafd5c..6274eff 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -96,7 +96,9 @@ public final class RpcChannelFactory {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Shutdown Shared RPC Pool");
     }
-    factory.releaseExternalResources();
+    if (factory != null) {
+      factory.releaseExternalResources();
+    }
     factory = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
new file mode 100644
index 0000000..113d181
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+public class TajoServiceException extends ServiceException {
+  private String traceMessage;
+  private String protocol;
+  private String remoteAddress;
+
+  public TajoServiceException(String message) {
+    super(message);
+  }
+  public TajoServiceException(String message, String traceMessage) {
+    super(message);
+    this.traceMessage = traceMessage;
+  }
+
+  public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) {
+    super(message, cause);
+
+    this.protocol = protocol;
+    this.remoteAddress = remoteAddress;
+  }
+
+  public String getTraceMessage() {
+    if(traceMessage == null && getCause() != null){
+      this.traceMessage = ExceptionUtils.getStackTrace(getCause());
+    }
+    return traceMessage;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public String getRemoteAddress() {
+    return remoteAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/main/proto/RpcProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/src/main/proto/RpcProtos.proto
index 2dac58d..69f43ed 100644
--- a/tajo-rpc/src/main/proto/RpcProtos.proto
+++ b/tajo-rpc/src/main/proto/RpcProtos.proto
@@ -26,5 +26,7 @@ message RpcRequest {
 message RpcResponse {
   required int32 id = 1;
   optional bytes response_message = 2;
-  optional string error_message = 3;
+  optional string error_class = 3;
+  optional string error_message = 4;
+  optional string error_trace = 5;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/bc1a3235/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index fe8685a..7acede6 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -30,6 +30,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +39,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.*;
 
 public class TestBlockingRpc {
-  public static String MESSAGE = "TestBlockingRpc";
+  public static final String MESSAGE = "TestBlockingRpc";
 
   private BlockingRpcServer server;
   private BlockingRpcClient client;
@@ -73,7 +75,7 @@ public class TestBlockingRpc {
         .setX3(3.15d)
         .setX4(2.0f).build();
     SumResponse response1 = stub.sum(null, request);
-    assertTrue(8.15d == response1.getResult());
+    assertEquals(8.15d, response1.getResult(), 1e-15);
 
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -100,7 +102,7 @@ public class TestBlockingRpc {
       }
     }.withRetries();
 
-    assertTrue(8.15d == response.getResult());
+    assertEquals(8.15d, response.getResult(), 1e-15);
 
     response =
         new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
@@ -117,6 +119,39 @@ public class TestBlockingRpc {
   }
 
   @Test
+  public void testThrowException() throws Exception {
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    try {
+      stub.throwException(null, message);
+      fail("RpcCall should throw exception");
+    } catch (Throwable t) {
+      assertTrue(t instanceof TajoServiceException);
+      assertEquals("Exception Test", t.getMessage());
+      TajoServiceException te = (TajoServiceException)t;
+      assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
+      assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
+          te.getRemoteAddress());
+    }
+  }
+
+  @Test
+  public void testConnectionFailed() throws Exception {
+    try {
+      int port = server.getListenAddress().getPort() + 1;
+      new BlockingRpcClient(DummyProtocol.class,
+          NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)));
+      fail("Connection should be failed.");
+    } catch (Throwable t) {
+      assertTrue(t instanceof IOException);
+      assertNotNull(t.getCause());
+      assertTrue(t.getCause() instanceof ConnectException);
+      assertTrue(t.getCause().getMessage().indexOf("Connection refused") >= 0);
+    }
+  }
+
+  @Test
   public void testGetNull() throws Exception {
     assertNull(stub.getNull(null, null));
     assertTrue(service.getNullCalled);


Mime
View raw message