hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1514554 [17/18] - in /hive/branches/vectorization: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ cli/src/test/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/sr...
Date Fri, 16 Aug 2013 01:22:02 GMT
Modified: hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Fri Aug 16 01:21:54 2013
@@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.TaskLogS
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+
 
 /**
  * Implemention of shims against Hadoop 0.20 with Security.
@@ -122,6 +124,11 @@ public class Hadoop20SShims extends Hado
     return fs.getDefaultReplication();
   }
 
+  @Override
+  public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){
+    TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
+  }
+
   /**
    * Returns a shim to wrap MiniMrCluster
    */

Modified: hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Aug 16 01:21:54 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.util.HostUtil;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+
 
 /**
  * Implemention of shims against Hadoop 0.23.0.
@@ -137,6 +139,11 @@ public class Hadoop23Shims extends Hadoo
     return Trash.moveToAppropriateTrash(fs, path, conf);
   }
 
+  @Override
+  public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){
+    TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
+  }
+
   /**
    * Returns a shim to wrap MiniMrCluster
    */

Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original)
+++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Fri Aug 16 01:21:54 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URI;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
 import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -525,6 +527,26 @@ public abstract class HadoopShimsSecure 
   }
 
   @Override
+  public Path createDelegationTokenFile(Configuration conf) throws IOException {
+
+    //get delegation token for user
+    String uname = UserGroupInformation.getLoginUser().getShortUserName();
+    FileSystem fs = FileSystem.get(conf);
+    Token<?> fsToken = fs.getDelegationToken(uname);
+
+    File t = File.createTempFile("hive_hadoop_delegation_token", null);
+    Path tokenPath = new Path(t.toURI());
+
+    //write credential with token to file
+    Credentials cred = new Credentials();
+    cred.addToken(fsToken.getService(), fsToken);
+    cred.writeTokenStorageFile(tokenPath, conf);
+
+    return tokenPath;
+  }
+
+
+  @Override
   public UserGroupInformation createProxyUser(String userName) throws IOException {
     return UserGroupInformation.createProxyUser(
         userName, UserGroupInformation.getLoginUser());
@@ -556,6 +578,21 @@ public abstract class HadoopShimsSecure 
   }
 
   @Override
+  public String getTokenFileLocEnvName() {
+    return UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+  }
+
+  @Override
+  public void reLoginUserFromKeytab() throws IOException{
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry
+    //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x)
+    if(ugi.isFromKeytab()){
+      ugi.checkTGTAndReloginFromKeytab();
+    }
+  }
+
+  @Override
   abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception;
 
   @Override

Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java (original)
+++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java Fri Aug 16 01:21:54 2013
@@ -32,7 +32,7 @@ public class DBTokenStore implements Del
 
   @Override
   public String[] getMasterKeys() throws TokenStoreException {
-    return (String[])invokeOnRawStore("getMasterKeys", null, null);
+    return (String[])invokeOnRawStore("getMasterKeys", new Object[0]);
   }
 
   @Override
@@ -75,7 +75,7 @@ public class DBTokenStore implements Del
   @Override
   public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException{
 
-    List<String> tokenIdents = (List<String>)invokeOnRawStore("getAllTokenIdentifiers", null, null);
+    List<String> tokenIdents = (List<String>)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]);
     List<DelegationTokenIdentifier> delTokenIdents = new ArrayList<DelegationTokenIdentifier>(tokenIdents.size());
 
     for (String tokenIdent : tokenIdents) {

Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Fri Aug 16 01:21:54 2013
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hive.thrift;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -40,8 +43,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Client;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -64,8 +65,6 @@ import org.apache.thrift.transport.TTran
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
-
  /**
   * Functions that bridge Thrift's SASL transports to Hadoop's
   * SASL callback handlers and authentication classes.
@@ -91,6 +90,19 @@ import static org.apache.hadoop.fs.Commo
      return new Server(keytabFile, principalConf);
    }
 
+   /**
+    * Read and return Hadoop SASL configuration which can be configured using
+    * "hadoop.rpc.protection"
+    * @param conf
+    * @return Hadoop SASL configuration
+    */
+   @Override
+   public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+     // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
+     SaslRpcServer.init(conf);
+     return SaslRpcServer.SASL_PROPS;
+   }
+
    public static class Client extends HadoopThriftAuthBridge.Client {
      /**
       * Create a client-side SASL transport that wraps an underlying transport.
@@ -99,13 +111,14 @@ import static org.apache.hadoop.fs.Commo
       *               supported.
       * @param serverPrincipal The Kerberos principal of the target server.
       * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+      * @param saslProps the sasl properties to create the client with
       */
 
      @Override
      public TTransport createClientTransport(
        String principalConfig, String host,
-        String methodStr, String tokenStrForm, TTransport underlyingTransport)
-       throws IOException {
+       String methodStr, String tokenStrForm, TTransport underlyingTransport,
+       Map<String, String> saslProps) throws IOException {
        AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
 
        TTransport saslTransport = null;
@@ -117,7 +130,7 @@ import static org.apache.hadoop.fs.Commo
             method.getMechanismName(),
             null,
             null, SaslRpcServer.SASL_DEFAULT_REALM,
-            SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t),
+            saslProps, new SaslClientCallbackHandler(t),
             underlyingTransport);
            return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
 
@@ -134,7 +147,7 @@ import static org.apache.hadoop.fs.Commo
                method.getMechanismName(),
                null,
                names[0], names[1],
-               SaslRpcServer.SASL_PROPS, null,
+               saslProps, null,
                underlyingTransport);
              return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
            } catch (SaslException se) {
@@ -142,7 +155,7 @@ import static org.apache.hadoop.fs.Commo
            }
 
          default:
-        throw new IOException("Unsupported authentication method: " + method);
+           throw new IOException("Unsupported authentication method: " + method);
        }
      }
     private static class SaslClientCallbackHandler implements CallbackHandler {
@@ -273,10 +286,11 @@ import static org.apache.hadoop.fs.Commo
       * can be passed as both the input and output transport factory when
       * instantiating a TThreadPoolServer, for example.
       *
+      * @param saslProps Map of SASL properties
       */
      @Override
-     public TTransportFactory createTransportFactory() throws TTransportException
-     {
+     public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+             throws TTransportException {
        // Parse out the kerberos principal, host, realm.
        String kerberosName = realUgi.getUserName();
        final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
@@ -288,11 +302,11 @@ import static org.apache.hadoop.fs.Commo
        transFactory.addServerDefinition(
          AuthMethod.KERBEROS.getMechanismName(),
          names[0], names[1],  // two parts of kerberos principal
-         SaslRpcServer.SASL_PROPS,
+         saslProps,
          new SaslRpcServer.SaslGssCallbackHandler());
        transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
           null, SaslRpcServer.SASL_DEFAULT_REALM,
-          SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager));
+          saslProps, new SaslDigestCallbackHandler(secretManager));
 
        return new TUGIAssumingTransportFactory(transFactory, realUgi);
      }
@@ -359,7 +373,9 @@ import static org.apache.hadoop.fs.Commo
      throws IOException, InterruptedException {
        if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
          throw new AuthorizationException(
-         "Delegation Token can be issued only with kerberos authentication");
+         "Delegation Token can be issued only with kerberos authentication. " +
+         "Current AuthenticationMethod: " + authenticationMethod.get()
+             );
        }
        //if the user asking the token is same as the 'owner' then don't do
        //any proxy authorization checks. For cases like oozie, where it gets
@@ -388,7 +404,9 @@ import static org.apache.hadoop.fs.Commo
      public long renewDelegationToken(String tokenStrForm) throws IOException {
        if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
          throw new AuthorizationException(
-         "Delegation Token can be issued only with kerberos authentication");
+         "Delegation Token can be issued only with kerberos authentication. " +
+         "Current AuthenticationMethod: " + authenticationMethod.get()
+             );
        }
        return secretManager.renewDelegationToken(tokenStrForm);
      }
@@ -430,7 +448,7 @@ import static org.apache.hadoop.fs.Commo
      public String getRemoteUser() {
        return remoteUser.get();
      }
-     
+
     /** CallbackHandler for SASL DIGEST-MD5 mechanism */
     // This code is pretty much completely based on Hadoop's
     // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not

Modified: hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Fri Aug 16 01:21:54 2013
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.TestCase;
 
@@ -72,20 +73,18 @@ public class TestHadoop20SAuthBridge ext
       return new Server();
     }
 
-
-
     static class Server extends HadoopThriftAuthBridge20S.Server {
       public Server() throws TTransportException {
         super();
       }
       @Override
-      public TTransportFactory createTransportFactory()
+      public TTransportFactory createTransportFactory(Map<String, String> saslProps)
       throws TTransportException {
         TSaslServerTransport.Factory transFactory =
           new TSaslServerTransport.Factory();
         transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
             null, SaslRpcServer.SASL_DEFAULT_REALM,
-            SaslRpcServer.SASL_PROPS,
+            saslProps,
             new SaslDigestCallbackHandler(secretManager));
 
         return new TUGIAssumingTransportFactory(transFactory, realUgi);
@@ -98,9 +97,9 @@ public class TestHadoop20SAuthBridge ext
       }
 
       @Override
-      public void startDelegationTokenSecretManager(Configuration conf)
+      public void startDelegationTokenSecretManager(Configuration conf, Object hms)
       throws IOException{
-        super.startDelegationTokenSecretManager(conf);
+        super.startDelegationTokenSecretManager(conf, hms);
         isMetastoreTokenManagerInited = true;
       }
 

Modified: hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Aug 16 01:21:54 2013
@@ -197,16 +197,18 @@ public interface HadoopShims {
    */
   public String unquoteHtmlChars(String item);
 
+
+
+  public void closeAllForUGI(UserGroupInformation ugi);
+
   /**
    * Get the UGI that the given job configuration will run as.
    *
    * In secure versions of Hadoop, this simply returns the current
    * access control context's user, ignoring the configuration.
    */
-
-  public void closeAllForUGI(UserGroupInformation ugi);
-
   public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException;
+
   /**
    * Used by metastore server to perform requested rpc in client context.
    * @param <T>
@@ -219,6 +221,26 @@ public interface HadoopShims {
     IOException, InterruptedException;
 
   /**
+   * Once a delegation token is stored in a file, the location is specified
+   * for a child process that runs hadoop operations, using an environment
+   * variable .
+   * @return Return the name of environment variable used by hadoop to find
+   *  location of token file
+   */
+  public String getTokenFileLocEnvName();
+
+
+  /**
+   * Get delegation token from filesystem and write the token along with
+   * metastore tokens into a file
+   * @param conf
+   * @return Path of the file with token credential
+   * @throws IOException
+   */
+  public Path createDelegationTokenFile(final Configuration conf) throws IOException;
+
+
+  /**
    * Used by metastore server to creates UGI object for a remote user.
    * @param userName remote User Name
    * @param groupNames group names associated with remote user name
@@ -321,13 +343,20 @@ public interface HadoopShims {
   public String getJobLauncherHttpAddress(Configuration conf);
 
 
- /**
-  *  Perform kerberos login using the given principal and keytab
- * @throws IOException
-  */
+  /**
+   *  Perform kerberos login using the given principal and keytab
+   * @throws IOException
+   */
   public void loginUserFromKeytab(String principal, String keytabFile) throws IOException;
 
   /**
+   * Perform kerberos re-login using the given principal and keytab, to renew
+   * the credentials
+   * @throws IOException
+   */
+  public void reLoginUserFromKeytab() throws IOException;
+
+  /**
    * Move the directory/file to trash. In case of the symlinks or mount points, the file is
    * moved to the trashbin in the actual volume of the path p being deleted
    * @param fs
@@ -365,6 +394,13 @@ public interface HadoopShims {
   UserGroupInformation createProxyUser(String userName) throws IOException;
 
   /**
+   * The method sets to set the partition file has a different signature between
+   * hadoop versions.
+   * @param jobConf
+   * @param partition
+   */
+  void setTotalOrderPartitionFile(JobConf jobConf, Path partition);
+  /**
    * InputSplitShim.
    *
    */

Modified: hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Fri Aug 16 01:21:54 2013
@@ -20,6 +20,7 @@
 
  import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TProcessor;
@@ -50,6 +51,18 @@ import org.apache.thrift.transport.TTran
    }
 
 
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   *
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+   public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+     throw new UnsupportedOperationException(
+       "The current version of Hadoop does not support Authentication");
+   }
+
    public static abstract class Client {
    /**
     *
@@ -65,13 +78,14 @@ import org.apache.thrift.transport.TTran
     * @throws IOException
     */
      public abstract TTransport createClientTransport(
-       String principalConfig, String host,
-       String methodStr,String tokenStrForm, TTransport underlyingTransport)
-       throws IOException;
+             String principalConfig, String host,
+             String methodStr, String tokenStrForm, TTransport underlyingTransport,
+             Map<String, String> saslProps)
+             throws IOException;
    }
 
    public static abstract class Server {
-     public abstract TTransportFactory createTransportFactory() throws TTransportException;
+     public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
      public abstract TProcessor wrapProcessor(TProcessor processor);
      public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
      public abstract InetAddress getRemoteAddress();

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java Fri Aug 16 01:21:54 2013
@@ -104,7 +104,7 @@ public class PTestClient {
   }
   public boolean testStart(String profile, String testHandle,
       String jira, String patch, String testOutputDir, boolean clearLibraryCache)
-  throws Exception {
+          throws Exception {
     patch = Strings.nullToEmpty(patch).trim();
     if(!patch.isEmpty()) {
       byte[] bytes = Resources.toByteArray(new URL(patch));
@@ -126,7 +126,7 @@ public class PTestClient {
     return result;
   }
   public boolean testList()
-  throws Exception {
+      throws Exception {
     TestListRequest testListRequest = new TestListRequest();
     TestListResponse testListResponse = post(testListRequest);
     for(TestStatus testStatus : testListResponse.getEntries()) {
@@ -135,7 +135,7 @@ public class PTestClient {
     return true;
   }
   public boolean testTailLog(String testHandle)
-  throws Exception {
+      throws Exception {
     testHandle = Strings.nullToEmpty(testHandle).trim();
     if(testHandle.isEmpty()) {
       throw new IllegalArgumentException("TestHandle is required");
@@ -163,7 +163,7 @@ public class PTestClient {
     return Status.isOK(statusResponse.getTestStatus().getStatus());
   }
   private void downloadTestResults(String testHandle, String testOutputDir)
-  throws Exception {
+      throws Exception {
     HttpGet request = new HttpGet(mLogsEndpoint + testHandle + "/test-results.tar.gz");
     FileOutputStream output = null;
     try {
@@ -183,14 +183,14 @@ public class PTestClient {
     }
   }
   private long printLogs(String testHandle, long offset)
-  throws Exception {
+      throws Exception {
     TestLogRequest logsRequest = new TestLogRequest(testHandle, offset, 64 * 1024);
     TestLogResponse logsResponse = post(logsRequest);
     System.out.print(logsResponse.getBody());
     return logsResponse.getOffset();
   }
   private <S extends GenericResponse> S post(Object payload)
-  throws Exception {
+      throws Exception {
     EndPointResponsePair endPointResponse = Preconditions.
         checkNotNull(REQUEST_TO_ENDPOINT.get(payload.getClass()), payload.getClass().getName());
     HttpPost request = new HttpPost(mApiEndPoint + endPointResponse.getEndpoint());
@@ -207,7 +207,7 @@ public class PTestClient {
       String response = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
       @SuppressWarnings("unchecked")
       S result =  (S)endPointResponse.
-          getResponseClass().cast(mMapper.readValue(response, endPointResponse.getResponseClass()));
+      getResponseClass().cast(mMapper.readValue(response, endPointResponse.getResponseClass()));
       Status.assertOK(result.getStatus());
       if(System.getProperty("DEBUG_PTEST_CLIENT") != null) {
         System.err.println("payload " + payloadString);
@@ -242,7 +242,7 @@ public class PTestClient {
     for(String requiredOption : requiredOptions) {
       if(!commandLine.hasOption(requiredOption)) {
         throw new IllegalArgumentException(requiredOption + " is required");
-      } 
+      }
     }
   }
   public static void main(String[] args) throws Exception {
@@ -258,7 +258,7 @@ public class PTestClient {
     options.addOption(null, TEST_HANDLE, true, "Server supplied test handle. (Required for testStop and testTailLog)");
     options.addOption(null, OUTPUT_DIR, true, "Directory to download and save test-results.tar.gz to. (Optional for testStart)");
     options.addOption(null, CLEAR_LIBRARY_CACHE, false, "Before starting the test, delete the ivy and maven directories (Optional for testStart)");
-    
+
     CommandLine commandLine = parser.parse(options, args);
 
     if(commandLine.hasOption(HELP_SHORT)) {
@@ -266,9 +266,9 @@ public class PTestClient {
       System.exit(0);
     }
     assertRequired(commandLine, new String[] {
-      COMMAND,
-      PASSWORD,
-      ENDPOINT
+        COMMAND,
+        PASSWORD,
+        ENDPOINT
     });
     PTestClient client = new PTestClient(commandLine.getOptionValue(ENDPOINT),
         commandLine.getOptionValue(PASSWORD));
@@ -278,7 +278,7 @@ public class PTestClient {
       assertRequired(commandLine, new String[] {
           PROFILE,
           TEST_HANDLE
-        });
+      });
       result = client.testStart(commandLine.getOptionValue(PROFILE), commandLine.getOptionValue(TEST_HANDLE),
           commandLine.getOptionValue(JIRA), commandLine.getOptionValue(PATCH), commandLine.getOptionValue(OUTPUT_DIR),
           commandLine.hasOption(CLEAR_LIBRARY_CACHE));

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java Fri Aug 16 01:21:54 2013
@@ -28,7 +28,7 @@ public class TestStartRequest {
   public TestStartRequest() {
 
   }
-  public TestStartRequest(String profile, String testHandle, 
+  public TestStartRequest(String profile, String testHandle,
       String jiraName, String patchURL, boolean clearLibraryCache) {
     this.profile = profile;
     this.testHandle = testHandle;
@@ -47,7 +47,7 @@ public class TestStartRequest {
   }
   public void setPatchURL(String patchURL) {
     this.patchURL = patchURL;
-  }  
+  }
   public boolean isClearLibraryCache() {
     return clearLibraryCache;
   }
@@ -60,7 +60,7 @@ public class TestStartRequest {
   public void setJiraName(String jiraName) {
     this.jiraName = jiraName;
   }
-  
+
   public String getTestHandle() {
     return testHandle;
   }

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java Fri Aug 16 01:21:54 2013
@@ -99,7 +99,8 @@ public class ExecutionController {
     mTestExecutor.start();
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
-    public void run() {
+      public void run() {
+        LOG.info("Shutdown hook called");
         try {
           mTestExecutor.shutdown();
         } catch (Exception e) {
@@ -131,7 +132,7 @@ public class ExecutionController {
       return new TestStartResponse(Status.illegalArgument());
     }
     if(!assertTestHandleIsAvailable(startRequest.getTestHandle())) {
-      return new TestStartResponse(Status.illegalArgument("Test handle " + startRequest.getTestHandle() + " already used")); 
+      return new TestStartResponse(Status.illegalArgument("Test handle " + startRequest.getTestHandle() + " already used"));
     }
     Test test = new Test(startRequest,
         Status.pending(), System.currentTimeMillis());
@@ -230,4 +231,4 @@ public class ExecutionController {
     Preconditions.checkState(!testOutputDir.isFile(), "Output directory " + testOutputDir + " is file");
     return testOutputDir.mkdir();
   }
-}
\ No newline at end of file
+}

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java Fri Aug 16 01:21:54 2013
@@ -66,7 +66,7 @@ public class TestExecutor extends Thread
   }
 
   @Override
-public void run() {
+  public void run() {
     while(execute) {
       Test test = null;
       PrintStream logStream = null;

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java Fri Aug 16 01:21:54 2013
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.ptest.execution;
-
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class CleanupPhase extends Phase {
-
-  public CleanupPhase(ImmutableList<HostExecutor> hostExecutors,
-      LocalCommandFactory localCommandFactory,
-      ImmutableMap<String, String> templateDefaults, Logger logger) {
-    super(hostExecutors, localCommandFactory, templateDefaults, logger);
-  }
-  @Override
-public void execute() throws Exception {
-    execHosts("killall -q -9 -f java || true");
-    TimeUnit.SECONDS.sleep(1);
-    execLocally("rm -rf $workingDir/scratch");
-    execInstances("rm -rf $localDir/$instanceName/scratch $localDir/$instanceName/logs");
-  }
-}
\ No newline at end of file

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java Fri Aug 16 01:21:54 2013
@@ -21,6 +21,7 @@ package org.apache.hive.ptest.execution;
 
 public class Constants {
 
+  public static final int EXIT_CODE_EXCEPTION = -1;
   public static final int EXIT_CODE_SUCCESS = 0;
   public static final int EXIT_CODE_UNKNOWN = 255;
 }

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java Fri Aug 16 01:21:54 2013
@@ -65,5 +65,4 @@ public class Drone {
   public int getInstance() {
     return instance;
   }
-
 }

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java Fri Aug 16 01:21:54 2013
@@ -28,19 +28,23 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hive.ptest.execution.conf.Host;
 import org.apache.hive.ptest.execution.conf.TestBatch;
+import org.apache.hive.ptest.execution.context.ExecutionContext;
 import org.slf4j.Logger;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public class ExecutionPhase extends Phase {
-
+  private static final long FOUR_HOURS = 4L* 60L * 60L * 1000L;
+  private final ExecutionContext executionContext;
+  private final HostExecutorBuilder hostExecutorBuilder;
   private final File succeededLogDir;
   private final File failedLogDir;
   private final BlockingQueue<TestBatch> parallelWorkQueue;
@@ -50,13 +54,16 @@ public class ExecutionPhase extends Phas
   private final Supplier<List<TestBatch>> testBatchSupplier;
   private final Set<TestBatch> failedTestResults;
 
-  public ExecutionPhase(ImmutableList<HostExecutor> hostExecutors,
+  public ExecutionPhase(List<HostExecutor> hostExecutors, ExecutionContext executionContext,
+      HostExecutorBuilder hostExecutorBuilder,
       LocalCommandFactory localCommandFactory,
       ImmutableMap<String, String> templateDefaults,
       File succeededLogDir, File failedLogDir, Supplier<List<TestBatch>> testBatchSupplier,
       Set<String> executedTests, Set<String> failedTests, Logger logger)
           throws IOException {
     super(hostExecutors, localCommandFactory, templateDefaults, logger);
+    this.executionContext = executionContext;
+    this.hostExecutorBuilder = hostExecutorBuilder;
     this.succeededLogDir = succeededLogDir;
     this.failedLogDir = failedLogDir;
     this.testBatchSupplier = testBatchSupplier;
@@ -68,7 +75,7 @@ public class ExecutionPhase extends Phas
         synchronizedSet(new HashSet<TestBatch>());
   }
   @Override
-public void execute() throws Throwable {
+  public void execute() throws Throwable {
     long start = System.currentTimeMillis();
     List<TestBatch> testBatches = Lists.newArrayList();
     for(TestBatch batch : testBatchSupplier.get()) {
@@ -80,37 +87,26 @@ public void execute() throws Throwable {
       }
     }
     try {
+      int expectedNumHosts = hostExecutors.size();
+      initalizeHosts();
       do {
-        float numberBadHosts = 0f;
-        for(HostExecutor hostExecutor : hostExecutors) {
-          if(hostExecutor.remainingDrones() == 0) {
-            numberBadHosts++;
-          }
-        }
-        Preconditions.checkState(hostExecutors.size() > 0, "Host executors cannot be empty");
-        float percentBadHosts = numberBadHosts / (float)hostExecutors.size();
-        if(percentBadHosts > 0.50f) {
-          throw new IllegalStateException("Too many bad hosts: " + percentBadHosts + "% (" + (int)numberBadHosts + 
-              " / " + hostExecutors.size() + ") is greater than threshold of 50%");
-        }
+        replaceBadHosts(expectedNumHosts);
         List<ListenableFuture<Void>> results = Lists.newArrayList();
-        for(HostExecutor hostExecutor : getHostExecutors()) {
+        for(HostExecutor hostExecutor : ImmutableList.copyOf(hostExecutors)) {
           results.add(hostExecutor.submitTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults));
         }
         Futures.allAsList(results).get();
       } while(!(parallelWorkQueue.isEmpty() && isolatedWorkQueue.isEmpty()));
-      Preconditions.checkState(parallelWorkQueue.isEmpty(), "Parallel work queue is not empty. All drones must have aborted.");
-      Preconditions.checkState(isolatedWorkQueue.isEmpty(), "Isolated work queue is not empty. All drones must have aborted.");
       for(TestBatch batch : testBatches) {
-       File batchLogDir;
-       if(failedTestResults.contains(batch)) {
-         batchLogDir = new File(failedLogDir, batch.getName());
-       } else {
-         batchLogDir = new File(succeededLogDir, batch.getName());
-       }
-       JUnitReportParser parser = new JUnitReportParser(logger, batchLogDir);
-       executedTests.addAll(parser.getExecutedTests());
-       failedTests.addAll(parser.getFailedTests());
+        File batchLogDir;
+        if(failedTestResults.contains(batch)) {
+          batchLogDir = new File(failedLogDir, batch.getName());
+        } else {
+          batchLogDir = new File(succeededLogDir, batch.getName());
+        }
+        JUnitReportParser parser = new JUnitReportParser(logger, batchLogDir);
+        executedTests.addAll(parser.getExecutedTests());
+        failedTests.addAll(parser.getFailedTests());
       }
     } finally {
       long elapsed = System.currentTimeMillis() - start;
@@ -118,5 +114,40 @@ public void execute() throws Throwable {
           TimeUnit.MINUTES.convert(elapsed, TimeUnit.MILLISECONDS) + " minutes");
     }
   }
-
+  private void replaceBadHosts(int expectedNumHosts)
+      throws Exception {
+    Set<Host> goodHosts = Sets.newHashSet();
+    for(HostExecutor hostExecutor : ImmutableList.copyOf(hostExecutors)) {
+      if(hostExecutor.isBad()) {
+        logger.info("Removing host during execution phase: " + hostExecutor.getHost());
+        executionContext.addBadHost(hostExecutor.getHost());
+        hostExecutors.remove(hostExecutor);
+      } else {
+        goodHosts.add(hostExecutor.getHost());
+      }
+    }
+    long start = System.currentTimeMillis();
+    while(hostExecutors.size() < expectedNumHosts) {
+      if(System.currentTimeMillis() - start > FOUR_HOURS) {
+        throw new RuntimeException("Waited over fours for hosts, still have only " + 
+            hostExecutors.size() + " hosts out of an expected " + expectedNumHosts);
+      }
+      logger.warn("Only " + hostExecutors.size() + " hosts out of an expected " + expectedNumHosts 
+          + ", attempting to replace bad hosts");
+      TimeUnit.MINUTES.sleep(1);
+      executionContext.replaceBadHosts();
+      for(Host host : executionContext.getHosts()) {
+        if(!goodHosts.contains(host)) {
+          HostExecutor hostExecutor = hostExecutorBuilder.build(host);
+          initalizeHost(hostExecutor);
+          if(hostExecutor.isBad()) {
+            executionContext.addBadHost(hostExecutor.getHost());
+          } else {
+            logger.info("Adding new host during execution phase: " + host);
+            hostExecutors.add(hostExecutor);
+          }
+        }
+      }
+    }
+  }
 }

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java Fri Aug 16 01:21:54 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hive.ptest.execution.conf.Host;
 import org.apache.hive.ptest.execution.conf.TestBatch;
 import org.apache.hive.ptest.execution.ssh.RSyncCommand;
@@ -41,16 +42,19 @@ import org.apache.hive.ptest.execution.s
 import org.apache.hive.ptest.execution.ssh.SSHResult;
 import org.slf4j.Logger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 class HostExecutor {
+  private static final int MAX_SOURCE_DIRS = 5;
   private final Host mHost;
   private final List<Drone> mDrones;
   private final ListeningExecutorService mExecutor;
@@ -102,10 +106,13 @@ class HostExecutor {
 
     });
   }
-
+  @VisibleForTesting
   int remainingDrones() {
     return mDrones.size();
   }
+  boolean isBad() {
+    return mDrones.isEmpty();
+  }
   Host getHost() {
     return mHost;
   }
@@ -130,7 +137,7 @@ class HostExecutor {
             do {
               batch = parallelWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
               if(batch != null) {
-                if(!executeTestBatch(drone, batch, failedTestResults.size())) {
+                if(!executeTestBatch(drone, batch, failedTestResults)) {
                   failedTestResults.add(batch);
                 }
               }
@@ -155,7 +162,7 @@ class HostExecutor {
         do {
           batch = isolatedWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
           if(batch != null) {
-            if(!executeTestBatch(drone, batch, failedTestResults.size())) {
+            if(!executeTestBatch(drone, batch, failedTestResults)) {
               failedTestResults.add(batch);
             }
           }
@@ -174,7 +181,7 @@ class HostExecutor {
    * Executes the test batch on the drone in question. If the command
    * exits with a status code of 255 throw an AbortDroneException.
    */
-  private boolean executeTestBatch(Drone drone, TestBatch batch, int numOfFailedTests)
+  private boolean executeTestBatch(Drone drone, TestBatch batch, Set<TestBatch> failedTestResults)
       throws IOException, SSHExecutionException, AbortDroneException {
     String scriptName = "hiveptest-" + batch.getName() + ".sh";
     File script = new File(mLocalScratchDirectory, scriptName);
@@ -184,7 +191,8 @@ class HostExecutor {
     templateVariables.put("testArguments", batch.getTestArguments());
     templateVariables.put("localDir", drone.getLocalDirectory());
     templateVariables.put("logDir", drone.getLocalLogDirectory());
-    templateVariables.put("numOfFailedTests", String.valueOf(numOfFailedTests));
+    templateVariables.put("maxSourceDirs", String.valueOf(MAX_SOURCE_DIRS));
+    templateVariables.put("numOfFailedTests", String.valueOf(failedTestResults.size()));
     String command = Templates.getTemplateResult("bash $localDir/$instanceName/scratch/" + script.getName(),
         templateVariables);
     Templates.writeTemplateResult("batch-exec.vm", script, templateVariables);
@@ -193,7 +201,7 @@ class HostExecutor {
     mLogger.info(drone + " executing " + batch + " with " + command);
     RemoteCommandResult sshResult = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
         drone.getHost(), drone.getInstance(), command).
-    call();
+        call();
     File batchLogDir = null;
     if(sshResult.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
       throw new AbortDroneException("Drone " + drone.toString() + " exited with " +
@@ -209,6 +217,13 @@ class HostExecutor {
     }
     copyFromDroneToLocal(drone, batchLogDir.getAbsolutePath(),
         drone.getLocalLogDirectory() + "/");
+    if(failedTestResults.size() > MAX_SOURCE_DIRS) {
+      File sourceDir = new File(batchLogDir, "source");
+      if(sourceDir.isDirectory()) {
+        mLogger.info("Max source directories exceeded, deleting " + sourceDir.getAbsolutePath()
+            + ":" + FileUtils.deleteQuietly(sourceDir));
+      }
+    }
     File logFile = new File(batchLogDir, String.format("%s.txt", batch.getName()));
     PrintWriter writer = new PrintWriter(logFile);
     writer.write(String.format("result = '%s'\n", sshResult.toString()));
@@ -247,34 +262,54 @@ class HostExecutor {
    * they will be removed from use possibly leaving this host with zero
    * functioning drones.
    */
-  List<ListenableFuture<RSyncResult>> rsyncFromLocalToRemoteInstances(final String localFile, final String remoteFile)
+  ListenableFuture<List<ListenableFuture<RemoteCommandResult>>> rsyncFromLocalToRemoteInstances(final String localFile, final String remoteFile)
       throws InterruptedException, IOException {
-    List<ListenableFuture<RSyncResult>> result = Lists.newArrayList();
-    for(final Drone drone : ImmutableList.copyOf(mDrones)) {
-      final Map<String, String> templateVariables = Maps.newHashMap(mTemplateDefaults);
-      templateVariables.put("instanceName", drone.getInstanceName());
-      templateVariables.put("localDir", drone.getLocalDirectory());
-      result.add(mExecutor.submit(new Callable<RSyncResult>() {
-        @Override
-        public RSyncResult call() throws Exception {
+    // the basic premise here is that we will rsync the directory to first working drone
+    // then execute a local rsync on the node to the other drones. This keeps
+    // us from executing tons of rsyncs on the master node conserving CPU
+    return mExecutor.submit(new Callable<List<ListenableFuture<RemoteCommandResult>>>() {
+      @Override
+      public List<ListenableFuture<RemoteCommandResult>> call()
+          throws Exception {
+        List<Drone> drones = Lists.newArrayList(mDrones);
+        List<ListenableFuture<RemoteCommandResult>> results = Lists.newArrayList();
+        // local path doesn't depend on drone variables
+        String resolvedLocalLocation = Files.simplifyPath(Templates.getTemplateResult(localFile, mTemplateDefaults));
+        String remoteStagingLocation = null;
+        for(final Drone drone : ImmutableList.copyOf(mDrones)) {
+          Preconditions.checkState(remoteStagingLocation == null, "Remote staging location must be null at the start of the loop");
+          final Map<String, String> templateVariables = Maps.newHashMap(mTemplateDefaults);
+          templateVariables.put("instanceName", drone.getInstanceName());
+          templateVariables.put("localDir", drone.getLocalDirectory());
+          String resolvedRemoteLocation = Files.simplifyPath(Templates.getTemplateResult(remoteFile, templateVariables));
           RSyncResult result = new RSyncCommand(mRSyncCommandExecutor, drone.getPrivateKey(), drone.getUser(),
               drone.getHost(), drone.getInstance(),
-              Templates.getTemplateResult(localFile, templateVariables),
-              Templates.getTemplateResult(remoteFile, templateVariables),
+              resolvedLocalLocation,
+              resolvedRemoteLocation,
               RSyncCommand.Type.FROM_LOCAL).call();
-          if(result.getExitCode() != Constants.EXIT_CODE_SUCCESS) {
+          if(result.getExitCode() == Constants.EXIT_CODE_SUCCESS) {
+            remoteStagingLocation = resolvedRemoteLocation;
+            drones.remove(drone);
+            mLogger.info("Successfully staged " + resolvedLocalLocation + " on " + remoteStagingLocation);
+            break;
+          } else {
             mDrones.remove(drone);
             mLogger.error("Aborting drone during rsync",
                 new AbortDroneException("Drone " + drone + " exited with "
                     + result.getExitCode() + ": " + result));
-            return null;
-          } else {
-            return result;
           }
         }
-      }));
-    }
-    return result;
+        if(remoteStagingLocation == null) {
+          Preconditions.checkState(mDrones.isEmpty(), "If remote staging location is not set all drones should be bad");
+          mLogger.warn("Unable to stage directory on remote host, all drones must be bad");
+        } else {
+          String name = (new File(resolvedLocalLocation)).getName();
+          remoteStagingLocation = Files.simplifyPath(remoteStagingLocation + "/" + name);
+          results.addAll(execInstances(drones, "rsync -qaPe --delete --delete-during --force " + remoteStagingLocation + " " + remoteFile));
+        }
+        return results;
+      }
+    });
   }
   RSyncResult copyFromDroneToLocal(Drone drone, String localFile, String remoteFile)
       throws SSHExecutionException, IOException {
@@ -299,47 +334,51 @@ class HostExecutor {
   ListenableFuture<SSHResult> exec(final String cmd)
       throws Exception {
     return mExecutor.submit(new Callable<SSHResult>() {
-        @Override
-        public SSHResult call() throws Exception {
-          for(final Drone drone : ImmutableList.copyOf(mDrones)) {
-            Map<String, String> templateVariables = Maps.newHashMap(mTemplateDefaults);
-            templateVariables.put("instanceName", drone.getInstanceName());
-            templateVariables.put("localDir", drone.getLocalDirectory());
-            String command = Templates.getTemplateResult(cmd, templateVariables);
-            SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
-                drone.getHost(), drone.getInstance(), command).call();
-            if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
-              mDrones.remove(drone); // return value not checked due to concurrent access
-              mLogger.error("Aborting drone during exec " + command,
-                  new AbortDroneException("Drone " + drone + " exited with "
-                      + Constants.EXIT_CODE_UNKNOWN + ": " + result));
-            } else {
-              return result;
-            }
+      @Override
+      public SSHResult call() throws Exception {
+        for(final Drone drone : ImmutableList.copyOf(mDrones)) {
+          Map<String, String> templateVariables = Maps.newHashMap(mTemplateDefaults);
+          templateVariables.put("instanceName", drone.getInstanceName());
+          templateVariables.put("localDir", drone.getLocalDirectory());
+          String command = Templates.getTemplateResult(cmd, templateVariables);
+          SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
+              drone.getHost(), drone.getInstance(), command).call();
+          if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
+            mDrones.remove(drone); // return value not checked due to concurrent access
+            mLogger.error("Aborting drone during exec " + command,
+                new AbortDroneException("Drone " + drone + " exited with "
+                    + Constants.EXIT_CODE_UNKNOWN + ": " + result));
+          } else {
+            return result;
           }
-          return null;
         }
+        return null;
+      }
     });
 
   }
-  List<ListenableFuture<SSHResult>> execInstances(final String cmd)
-      throws SSHExecutionException, InterruptedException, IOException {
-    List<ListenableFuture<SSHResult>> result = Lists.newArrayList();
-    for(final Drone drone : ImmutableList.copyOf(mDrones)) {
-      result.add(mExecutor.submit(new Callable<SSHResult>() {
+  List<ListenableFuture<RemoteCommandResult>> execInstances(final String cmd)
+      throws InterruptedException, IOException {
+    return execInstances(mDrones, cmd);
+  }
+  private List<ListenableFuture<RemoteCommandResult>> execInstances(List<Drone> drones, final String cmd)
+      throws InterruptedException, IOException {
+    List<ListenableFuture<RemoteCommandResult>> result = Lists.newArrayList();
+    for(final Drone drone : ImmutableList.copyOf(drones)) {
+      result.add(mExecutor.submit(new Callable<RemoteCommandResult>() {
         @Override
-        public SSHResult call() throws Exception {
+        public RemoteCommandResult call() throws Exception {
           Map<String, String> templateVariables = Maps.newHashMap(mTemplateDefaults);
           templateVariables.put("instanceName", drone.getInstanceName());
           templateVariables.put("localDir", drone.getLocalDirectory());
           String command = Templates.getTemplateResult(cmd, templateVariables);
           SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
               drone.getHost(), drone.getInstance(), command).call();
-          if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
+          if(result.getExitCode() != Constants.EXIT_CODE_SUCCESS) {
             mDrones.remove(drone); // return value not checked due to concurrent access
             mLogger.error("Aborting drone during exec " + command,
                 new AbortDroneException("Drone " + drone + " exited with "
-                    + Constants.EXIT_CODE_UNKNOWN + ": " + result));
+                    + result.getExitCode() + ": " + result));
             return null;
           } else {
             return result;

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java Fri Aug 16 01:21:54 2013
@@ -20,9 +20,10 @@ package org.apache.hive.ptest.execution;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.SortedSet;
 
 import org.apache.hive.ptest.api.server.TestLogger;
 import org.apache.hive.ptest.execution.conf.Context;
@@ -77,9 +78,9 @@ class JIRAService {
     mJenkinsURL = configuration.getJenkinsURL();
   }
 
-  void postComment(boolean error, int numExecutesTests, Set<String> failedTests,
-      List<String> messages) { 
-    DefaultHttpClient httpClient = new DefaultHttpClient();    
+  void postComment(boolean error, int numExecutesTests, SortedSet<String> failedTests,
+      List<String> messages) {
+    DefaultHttpClient httpClient = new DefaultHttpClient();
     try {
       String buildTag = formatBuildTag(mBuildTag);
       List<String> comments = Lists.newArrayList();
@@ -111,7 +112,7 @@ class JIRAService {
           comments.addAll(failedTests);
           comments.add("{noformat}");
         }
-        comments.add("");        
+        comments.add("");
       }
       comments.add("Test results: " + mJenkinsURL + "/" + buildTag + "/testReport");
       comments.add("Console output: " + mJenkinsURL + "/" + buildTag + "/console");
@@ -121,21 +122,21 @@ class JIRAService {
         comments.add("{noformat}");
         comments.addAll(messages);
         comments.add("{noformat}");
-        comments.add("");        
+        comments.add("");
       }
       comments.add("This message is automatically generated.");
-      mLogger.info("Comment: " + Joiner.on("\n").join(comments));      
+      mLogger.info("Comment: " + Joiner.on("\n").join(comments));
       String body = Joiner.on("\n").join(comments);
       String url = String.format("%s/rest/api/2/issue/%s/comment", mUrl, mName);
       URL apiURL = new URL(mUrl);
       httpClient.getCredentialsProvider()
-          .setCredentials(
-              new AuthScope(apiURL.getHost(), apiURL.getPort(),
-                  AuthScope.ANY_REALM),
+      .setCredentials(
+          new AuthScope(apiURL.getHost(), apiURL.getPort(),
+              AuthScope.ANY_REALM),
               new UsernamePasswordCredentials(mUser, mPassword));
       BasicHttpContext localcontext = new BasicHttpContext();
       localcontext.setAttribute("preemptive-auth", new BasicScheme());
-      httpClient.addRequestInterceptor(new PreemptiveAuth(), 0);      
+      httpClient.addRequestInterceptor(new PreemptiveAuth(), 0);
       HttpPost request = new HttpPost(url);
       ObjectMapper mapper = new ObjectMapper();
       StringEntity params = new StringEntity(mapper.writeValueAsString(new Body(body)));
@@ -155,12 +156,12 @@ class JIRAService {
       httpClient.getConnectionManager().shutdown();
     }
   }
-  
-  @SuppressWarnings("unused")  
+
+  @SuppressWarnings("unused")
   private static class Body {
     private String body;
     public Body() {
-      
+
     }
     public Body(String body) {
       this.body = body;
@@ -170,9 +171,9 @@ class JIRAService {
     }
     public void setBody(String body) {
       this.body = body;
-    }    
+    }
   }
-  
+
   /**
    * Hive-Build-123 to Hive-Build/123
    */
@@ -198,7 +199,7 @@ class JIRAService {
 
     public void process(final HttpRequest request, final HttpContext context)
         throws HttpException, IOException {
-      AuthState authState = (AuthState) context.getAttribute(ClientContext.TARGET_AUTH_STATE);      
+      AuthState authState = (AuthState) context.getAttribute(ClientContext.TARGET_AUTH_STATE);
       if (authState.getAuthScheme() == null) {
         AuthScheme authScheme = (AuthScheme) context.getAttribute("preemptive-auth");
         CredentialsProvider credsProvider = (CredentialsProvider) context.getAttribute(ClientContext.CREDS_PROVIDER);
@@ -215,7 +216,7 @@ class JIRAService {
       }
     }
   }
-  
+
   public static void main(String[] args) throws Exception {
     TestLogger logger = new TestLogger(System.err, TestLogger.LEVEL.TRACE);
     Map<String, String> context = Maps.newHashMap();
@@ -230,7 +231,7 @@ class JIRAService {
     configuration.setJiraName("HIVE-4892");
     JIRAService service = new JIRAService(logger, configuration, "test-123");
     List<String> messages = Lists.newArrayList("msg1", "msg2");
-    Set<String> failedTests = Sets.newHashSet("failed");
+    SortedSet<String> failedTests = Sets.newTreeSet(Collections.singleton("failed"));
     service.postComment(false, 5, failedTests, messages);
   }
-}
+}
\ No newline at end of file

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java Fri Aug 16 01:21:54 2013
@@ -87,21 +87,25 @@ public class JUnitReportParser {
           private String name;
           private boolean failedOrErrored;
           @Override
-        public void startElement(String uri, String localName, String qName, Attributes attributes) {
+          public void startElement(String uri, String localName, String qName, Attributes attributes) {
             if ("testcase".equals(qName)) {
               name = attributes.getValue("classname");
               failedOrErrored = false;
-              if(name == null) {
+              if(name == null || "junit.framework.TestSuite".equals(name)) {
                 name = attributes.getValue("name");
               } else {
                 name = name + "." + attributes.getValue("name");
               }
-            } else if (name != null && ("failure".equals(qName) || "error".equals(qName))) {
-              failedOrErrored = true;
+            } else if (name != null) {
+              if ("failure".equals(qName) || "error".equals(qName)) {
+                failedOrErrored = true;
+              } else if("skipped".equals(qName)) {
+                name = null;
+              }
             }
           }
           @Override
-        public void endElement(String uri, String localName, String qName)  {
+          public void endElement(String uri, String localName, String qName)  {
             if ("testcase".equals(qName)) {
               if(name != null) {
                 executedTests.add(name);
@@ -125,4 +129,4 @@ public class JUnitReportParser {
       }
     }
   }
-}
+}
\ No newline at end of file

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java Fri Aug 16 01:21:54 2013
@@ -41,7 +41,7 @@ public class LogDirectoryCleaner extends
   }
 
   @Override
-public void run() {
+  public void run() {
     try {
       File[] logDirs = mLogDir.listFiles();
       if(logDirs != null &&  logDirs.length > 0) {
@@ -81,10 +81,10 @@ public void run() {
     File getOldest() {
       Preconditions.checkState(!dirs.isEmpty(), "Cannot be called unless dirs.size() >= 1");
       File eldestDir = null;
-      int eldestId = Integer.MAX_VALUE;
+      long eldestId = Long.MAX_VALUE;
       for(File dir : dirs) {
         try {
-          int id = Integer.parseInt(dir.getName().substring(name.length() + 1));
+          long id = Long.parseLong(dir.getName().substring(name.length() + 1));
           if(id < eldestId) {
             eldestId = id;
             eldestDir = dir;

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java Fri Aug 16 01:21:54 2013
@@ -20,12 +20,16 @@ package org.apache.hive.ptest.execution;
 
 import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -47,7 +51,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -72,13 +75,13 @@ public class PTest {
   private final List<Phase> mPhases;
   private final ExecutionContext mExecutionContext;
   private final Logger mLogger;
-  private final ImmutableList<HostExecutor> mHostExecutors;
+  private final List<HostExecutor> mHostExecutors;
   private final String mBuildTag;
 
-  public PTest(final TestConfiguration configuration, ExecutionContext executionContext,
-      String buildTag, File logDir, LocalCommandFactory localCommandFactory, SSHCommandExecutor sshCommandExecutor,
-      RSyncCommandExecutor rsyncCommandExecutor, Logger logger)
-    throws Exception {
+  public PTest(final TestConfiguration configuration, final ExecutionContext executionContext,
+      final String buildTag, final File logDir, final LocalCommandFactory localCommandFactory,
+      final SSHCommandExecutor sshCommandExecutor, final  RSyncCommandExecutor rsyncCommandExecutor,
+      final Logger logger) throws Exception {
     mConfiguration = configuration;
     mLogger = logger;
     mBuildTag = buildTag;
@@ -86,9 +89,9 @@ public class PTest {
     mFailedTests = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
     mExecutionContext = executionContext;
     mExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
-    File failedLogDir = Dirs.create(new File(logDir, "failed"));
-    File succeededLogDir = Dirs.create(new File(logDir, "succeeded"));
-    File scratchDir = Dirs.createEmpty(new File(mExecutionContext.getLocalWorkingDirectory(), "scratch"));
+    final File failedLogDir = Dirs.create(new File(logDir, "failed"));
+    final File succeededLogDir = Dirs.create(new File(logDir, "succeeded"));
+    final File scratchDir = Dirs.createEmpty(new File(mExecutionContext.getLocalWorkingDirectory(), "scratch"));
     File patchDir = Dirs.createEmpty(new File(logDir, "patches"));
     File patchFile = null;
     if(!configuration.getPatch().isEmpty()) {
@@ -97,32 +100,38 @@ public class PTest {
     }
     ImmutableMap.Builder<String, String> templateDefaultsBuilder = ImmutableMap.builder();
     templateDefaultsBuilder.
-        put("repository", configuration.getRepository()).
-        put("repositoryName", configuration.getRepositoryName()).
-        put("repositoryType", configuration.getRepositoryType()).
-        put("branch", configuration.getBranch()).
-        put("clearLibraryCache", String.valueOf(configuration.isClearLibraryCache())).
-        put("workingDir", mExecutionContext.getLocalWorkingDirectory()).
-        put("antArgs", configuration.getAntArgs()).
-        put("buildTag", buildTag).
-        put("logDir", logDir.getAbsolutePath()).
-        put("javaHome", configuration.getJavaHome()).
-        put("antEnvOpts", configuration.getAntEnvOpts());
-    ImmutableMap<String, String> templateDefaults = templateDefaultsBuilder.build();
+    put("repository", configuration.getRepository()).
+    put("repositoryName", configuration.getRepositoryName()).
+    put("repositoryType", configuration.getRepositoryType()).
+    put("branch", configuration.getBranch()).
+    put("clearLibraryCache", String.valueOf(configuration.isClearLibraryCache())).
+    put("workingDir", mExecutionContext.getLocalWorkingDirectory()).
+    put("antArgs", configuration.getAntArgs()).
+    put("buildTag", buildTag).
+    put("logDir", logDir.getAbsolutePath()).
+    put("javaHome", configuration.getJavaHome()).
+    put("antEnvOpts", configuration.getAntEnvOpts());
+    final ImmutableMap<String, String> templateDefaults = templateDefaultsBuilder.build();
     TestParser testParser = new TestParser(configuration.getContext(),
         new File(mExecutionContext.getLocalWorkingDirectory(), configuration.getRepositoryName() + "-source"),
         logger);
 
-    ImmutableList.Builder<HostExecutor> hostExecutorsBuilder = ImmutableList.builder();
+    HostExecutorBuilder hostExecutorBuilder = new HostExecutorBuilder() {
+      @Override
+      public HostExecutor build(Host host) {
+        return new HostExecutor(host, executionContext.getPrivateKey(), mExecutor, sshCommandExecutor,
+            rsyncCommandExecutor, templateDefaults, scratchDir, succeededLogDir, failedLogDir, 10, logger);
+      }
+
+    };
+    List<HostExecutor> hostExecutors = new ArrayList<HostExecutor>();
     for(Host host : mExecutionContext.getHosts()) {
-      hostExecutorsBuilder.add(new HostExecutor(host, executionContext.getPrivateKey(), mExecutor, sshCommandExecutor,
-          rsyncCommandExecutor, templateDefaults, scratchDir, succeededLogDir, failedLogDir, 10, logger));
+      hostExecutors.add(hostExecutorBuilder.build(host));
     }
-    mHostExecutors = hostExecutorsBuilder.build();
+    mHostExecutors = new CopyOnWriteArrayList<HostExecutor>(hostExecutors);
     mPhases = Lists.newArrayList();
-    mPhases.add(new CleanupPhase(mHostExecutors, localCommandFactory, templateDefaults, logger));
     mPhases.add(new PrepPhase(mHostExecutors, localCommandFactory, templateDefaults, scratchDir, patchFile, logger));
-    mPhases.add(new ExecutionPhase(mHostExecutors, localCommandFactory, templateDefaults,
+    mPhases.add(new ExecutionPhase(mHostExecutors, mExecutionContext, hostExecutorBuilder, localCommandFactory, templateDefaults,
         succeededLogDir, failedLogDir, testParser.parse(), mExecutedTests, mFailedTests, logger));
     mPhases.add(new ReportingPhase(mHostExecutors, localCommandFactory, templateDefaults, logger));
   }
@@ -155,24 +164,25 @@ public class PTest {
       error = true;
     } finally {
       for(HostExecutor hostExecutor : mHostExecutors) {
-        if(hostExecutor.remainingDrones() == 0) {
+        if(hostExecutor.isBad()) {
           mExecutionContext.addBadHost(hostExecutor.getHost());
         }
       }
       mExecutor.shutdownNow();
-      if(mFailedTests.isEmpty()) {
-        mLogger.info(String.format("%d failed tests", mFailedTests.size()));
+      SortedSet<String> failedTests = new TreeSet<String>(mFailedTests);
+      if(failedTests.isEmpty()) {
+        mLogger.info(String.format("%d failed tests", failedTests.size()));
       } else {
-        mLogger.warn(String.format("%d failed tests", mFailedTests.size()));
+        mLogger.warn(String.format("%d failed tests", failedTests.size()));
       }
-      for(String failingTestName : mFailedTests) {
+      for(String failingTestName : failedTests) {
         mLogger.warn(failingTestName);
       }
       mLogger.info("Executed " + mExecutedTests.size() + " tests");
       for(Map.Entry<String, Long> entry : elapsedTimes.entrySet()) {
         mLogger.info(String.format("PERF: Phase %s took %d minutes", entry.getKey(), entry.getValue()));
       }
-      publishJiraComment(error, messages);
+      publishJiraComment(error, messages, failedTests);
       if(error || !mFailedTests.isEmpty()) {
         result = 1;
       }
@@ -180,7 +190,7 @@ public class PTest {
     return result;
   }
 
-  private void publishJiraComment(boolean error, List<String> messages) {
+  private void publishJiraComment(boolean error, List<String> messages, SortedSet<String> failedTests) {
     if(mConfiguration.getJiraName().isEmpty()) {
       mLogger.info("Skipping JIRA comment as name is empty.");
       return;
@@ -198,7 +208,7 @@ public class PTest {
       return;
     }
     JIRAService jira = new JIRAService(mLogger, mConfiguration, mBuildTag);
-    jira.postComment(error, mExecutedTests.size(), mFailedTests, messages);
+    jira.postComment(error, mExecutedTests.size(), failedTests, messages);
   }
 
   public static class Builder {
@@ -245,65 +255,65 @@ public class PTest {
         fromFile(testConfigurationFile);
     String buildTag = System.getenv("BUILD_TAG") == null ? "undefined-"
         + System.currentTimeMillis() : System.getenv("BUILD_TAG");
-    File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag));
-    LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
-        getGlobalLogDirectory()), 5);
-    cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
-    cleaner.setDaemon(true);
-    cleaner.start();
-    TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
-    String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
-    if(!repository.isEmpty()) {
-      conf.setRepository(repository);
-    }
-    String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
-    if(!repositoryName.isEmpty()) {
-      conf.setRepositoryName(repositoryName);
-    }
-    String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
-    if(!branch.isEmpty()) {
-      conf.setBranch(branch);
-    }
-    String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
-    if(!patch.isEmpty()) {
-      conf.setPatch(patch);
-    }
-    String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
-    if(!javaHome.isEmpty()) {
-      conf.setJavaHome(javaHome);
-    }
-    String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
-    if(!antEnvOpts.isEmpty()) {
-      conf.setAntEnvOpts(antEnvOpts);
-    }
-    String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
-    if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
-      String antArgs = Strings.nullToEmpty(conf.getAntArgs());
-      if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
-        antArgs += " ";
-      }
-      antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
-      conf.setAntArgs(antArgs);
-    }
-    ExecutionContextProvider executionContextProvider = null;
-    ExecutionContext executionContext = null;
-    int exitCode = 0;
-    try {
-      executionContextProvider = executionContextConfiguration
-          .getExecutionContextProvider();
-      executionContext = executionContextProvider.createExecutionContext();
-      PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
-          new LocalCommandFactory(LOG), new SSHCommandExecutor(LOG),
-          new RSyncCommandExecutor(LOG), LOG);
-      exitCode = ptest.run();
-    } finally {
-      if(executionContext != null) {
-        executionContext.terminate();
-      }
-      if(executionContextProvider != null) {
-        executionContextProvider.close();
-      }
-    }
-    System.exit(exitCode);
+        File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag));
+        LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
+            getGlobalLogDirectory()), 5);
+        cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
+        cleaner.setDaemon(true);
+        cleaner.start();
+        TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
+        String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
+        if(!repository.isEmpty()) {
+          conf.setRepository(repository);
+        }
+        String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
+        if(!repositoryName.isEmpty()) {
+          conf.setRepositoryName(repositoryName);
+        }
+        String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
+        if(!branch.isEmpty()) {
+          conf.setBranch(branch);
+        }
+        String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
+        if(!patch.isEmpty()) {
+          conf.setPatch(patch);
+        }
+        String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
+        if(!javaHome.isEmpty()) {
+          conf.setJavaHome(javaHome);
+        }
+        String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
+        if(!antEnvOpts.isEmpty()) {
+          conf.setAntEnvOpts(antEnvOpts);
+        }
+        String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
+        if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
+          String antArgs = Strings.nullToEmpty(conf.getAntArgs());
+          if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
+            antArgs += " ";
+          }
+          antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
+          conf.setAntArgs(antArgs);
+        }
+        ExecutionContextProvider executionContextProvider = null;
+        ExecutionContext executionContext = null;
+        int exitCode = 0;
+        try {
+          executionContextProvider = executionContextConfiguration
+              .getExecutionContextProvider();
+          executionContext = executionContextProvider.createExecutionContext();
+          PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
+              new LocalCommandFactory(LOG), new SSHCommandExecutor(LOG),
+              new RSyncCommandExecutor(LOG), LOG);
+          exitCode = ptest.run();
+        } finally {
+          if(executionContext != null) {
+            executionContext.terminate();
+          }
+          if(executionContextProvider != null) {
+            executionContextProvider.close();
+          }
+        }
+        System.exit(exitCode);
   }
 }

Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java (original)
+++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java Fri Aug 16 01:21:54 2013
@@ -20,29 +20,32 @@ package org.apache.hive.ptest.execution;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hive.ptest.execution.LocalCommand.CollectLogPolicy;
 import org.apache.hive.ptest.execution.ssh.NonZeroExitCodeException;
-import org.apache.hive.ptest.execution.ssh.RSyncResult;
 import org.apache.hive.ptest.execution.ssh.RemoteCommandResult;
 import org.apache.hive.ptest.execution.ssh.SSHExecutionException;
 import org.apache.hive.ptest.execution.ssh.SSHResult;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public abstract class Phase {
 
-  protected final ImmutableList<HostExecutor> hostExecutors;
+  protected final List<HostExecutor> hostExecutors;
   private final LocalCommandFactory localCommandFactory;
   private final ImmutableMap<String, String> templateDefaults;
   protected final Logger logger;
 
-  public Phase(ImmutableList<HostExecutor> hostExecutors,
+  public Phase(List<HostExecutor> hostExecutors,
       LocalCommandFactory localCommandFactory,
       ImmutableMap<String, String> templateDefaults, Logger logger) {
     super();
@@ -67,13 +70,13 @@ public abstract class Phase {
     }
   }
   // prep
-  protected List<RSyncResult> rsyncFromLocalToRemoteInstances(String localFile, String remoteFile)
+  protected List<RemoteCommandResult> rsyncFromLocalToRemoteInstances(String localFile, String remoteFile)
       throws Exception {
-    List<ListenableFuture<RSyncResult>> futures = Lists.newArrayList();
+    List<ListenableFuture<List<ListenableFuture<RemoteCommandResult>>>> futures = Lists.newArrayList();
     for(HostExecutor hostExecutor : hostExecutors) {
-      futures.addAll(hostExecutor.rsyncFromLocalToRemoteInstances(localFile, remoteFile));
+      futures.add(hostExecutor.rsyncFromLocalToRemoteInstances(localFile, remoteFile));
     }
-    return toListOfResults(futures);
+    return flatten(futures);
   }
 
   // clean
@@ -86,16 +89,76 @@ public abstract class Phase {
     return toListOfResults(futures);
   }
   // clean prep
-  protected List<SSHResult> execInstances(String command)
+  protected List<RemoteCommandResult> execInstances(String command)
       throws Exception {
-    List<ListenableFuture<SSHResult>> futures = Lists.newArrayList();
+    List<ListenableFuture<RemoteCommandResult>> futures = Lists.newArrayList();
     for(HostExecutor hostExecutor : hostExecutors) {
       futures.addAll(hostExecutor.execInstances(command));
     }
     return toListOfResults(futures);
   }
+  protected List<RemoteCommandResult> initalizeHosts()
+      throws Exception {
+    List<ListenableFuture<List<RemoteCommandResult>>> futures = Lists.newArrayList();
+    ListeningExecutorService executor = MoreExecutors.
+        listeningDecorator(Executors.newFixedThreadPool(hostExecutors.size()));
+    try {
+      for(final HostExecutor hostExecutor : hostExecutors) {
+        futures.add(executor.submit(new Callable<List<RemoteCommandResult>>() {
+          @Override
+          public List<RemoteCommandResult> call() throws Exception {
+            return initalizeHost(hostExecutor);
+          }
+        }));
+      }
+      List<RemoteCommandResult> results = Lists.newArrayList();
+      for(ListenableFuture<List<RemoteCommandResult>> future : futures) {
+        List<RemoteCommandResult> result = future.get();
+        if(result != null) {
+          results.addAll(result);
+        }
+      }
+      executor.shutdown();
+      return results;
+    } finally {
+      if(executor.isShutdown()) {
+        executor.shutdownNow();
+      }
+    }
+  }
+  protected List<RemoteCommandResult> initalizeHost(HostExecutor hostExecutor)
+      throws Exception {
+    List<RemoteCommandResult> results = Lists.newArrayList();
+    results.add(hostExecutor.exec("killall -q -9 -f java || true").get());
+    TimeUnit.SECONDS.sleep(1);
+    // order matters in all of these so block
+    results.addAll(toListOfResults(hostExecutor.execInstances("rm -rf $localDir/$instanceName/scratch $localDir/$instanceName/logs")));
+    results.addAll(toListOfResults(hostExecutor.execInstances("mkdir -p $localDir/$instanceName/logs " +
+        "$localDir/$instanceName/maven " +
+        "$localDir/$instanceName/scratch " +
+        "$localDir/$instanceName/ivy " +
+        "$localDir/$instanceName/${repositoryName}-source")));
+    // order does not matter below, so go wide
+    List<ListenableFuture<List<ListenableFuture<RemoteCommandResult>>>> futures = Lists.newArrayList();
+    futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/${repositoryName}-source", "$localDir/$instanceName/"));
+    futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/maven", "$localDir/$instanceName/"));
+    futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/ivy", "$localDir/$instanceName/"));
+    results.addAll(flatten(futures));
+    return results;
+  }
+  private <T extends RemoteCommandResult> List<T> flatten(List<ListenableFuture<List<ListenableFuture<T>>>> futures)
+      throws Exception {
+    List<T> results = Lists.newArrayList();
+    for(ListenableFuture<List<ListenableFuture<T>>> future : futures) {
+      List<ListenableFuture<T>> result = future.get();
+      if(result != null) {
+        results.addAll(toListOfResults(result));
+      }
+    }
+    return results;
+  }
   private <T extends RemoteCommandResult> List<T> toListOfResults(List<ListenableFuture<T>> futures)
-  throws Exception {
+      throws Exception {
     List<T> results = Lists.newArrayList();
     for(T result : Futures.allAsList(futures).get()) {
       if(result != null) {
@@ -110,7 +173,4 @@ public abstract class Phase {
   protected ImmutableMap<String, String> getTemplateDefaults() {
     return templateDefaults;
   }
-  protected ImmutableList<HostExecutor> getHostExecutors() {
-    return hostExecutors;
-  }
 }



Mime
View raw message