Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 01E4A1074E for ; Fri, 16 Aug 2013 01:33:16 +0000 (UTC) Received: (qmail 83125 invoked by uid 500); 16 Aug 2013 01:28:18 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 81370 invoked by uid 500); 16 Aug 2013 01:27:07 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 80147 invoked by uid 99); 16 Aug 2013 01:23:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Aug 2013 01:23:42 +0000 X-ASF-Spam-Status: No, hits=-1999.6 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM_FRAUD_PHISH,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Aug 2013 01:23:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 68DED2388C36; Fri, 16 Aug 2013 01:22:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1514554 [17/18] - in /hive/branches/vectorization: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ cli/src/test/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/sr... Date: Fri, 16 Aug 2013 01:22:02 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130816012220.68DED2388C36@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Fri Aug 16 01:21:54 2013 @@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.TaskLogS import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; + /** * Implemention of shims against Hadoop 0.20 with Security. @@ -122,6 +124,11 @@ public class Hadoop20SShims extends Hado return fs.getDefaultReplication(); } + @Override + public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ + TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); + } + /** * Returns a shim to wrap MiniMrCluster */ Modified: hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Aug 16 01:21:54 2013 @@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.util.HostUtil; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; + /** * Implemention of shims against Hadoop 0.23.0. @@ -137,6 +139,11 @@ public class Hadoop23Shims extends Hadoo return Trash.moveToAppropriateTrash(fs, path, conf); } + @Override + public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ + TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); + } + /** * Returns a shim to wrap MiniMrCluster */ Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original) +++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Fri Aug 16 01:21:54 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims; import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.lang.reflect.Constructor; import java.net.URI; @@ -59,6 +60,7 @@ import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -525,6 +527,26 @@ public abstract class HadoopShimsSecure } @Override + public Path createDelegationTokenFile(Configuration conf) throws IOException { + + //get delegation token for user + String uname = UserGroupInformation.getLoginUser().getShortUserName(); + FileSystem fs = FileSystem.get(conf); + Token fsToken = fs.getDelegationToken(uname); + + File t = File.createTempFile("hive_hadoop_delegation_token", null); + Path tokenPath = new Path(t.toURI()); + + //write credential with token to file + Credentials cred = new Credentials(); + cred.addToken(fsToken.getService(), fsToken); + cred.writeTokenStorageFile(tokenPath, conf); + + return tokenPath; + } + + + @Override public UserGroupInformation createProxyUser(String userName) throws IOException { return UserGroupInformation.createProxyUser( userName, UserGroupInformation.getLoginUser()); @@ -556,6 +578,21 @@ public abstract class HadoopShimsSecure } @Override + public String getTokenFileLocEnvName() { + return UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; + } + + @Override + public void reLoginUserFromKeytab() throws IOException{ + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry + //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x) + if(ugi.isFromKeytab()){ + ugi.checkTGTAndReloginFromKeytab(); + } + } + + @Override abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception; @Override Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java (original) +++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DBTokenStore.java Fri Aug 16 01:21:54 2013 @@ -32,7 +32,7 @@ public class DBTokenStore implements Del @Override public String[] getMasterKeys() throws TokenStoreException { - return (String[])invokeOnRawStore("getMasterKeys", null, null); + return (String[])invokeOnRawStore("getMasterKeys", new Object[0]); } @Override @@ -75,7 +75,7 @@ public class DBTokenStore implements Del @Override public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ - List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", null, null); + List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]); List delTokenIdents = new ArrayList(tokenIdents.size()); for (String tokenIdent : tokenIdents) { Modified: hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original) +++ hive/branches/vectorization/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Fri Aug 16 01:21:54 2013 @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hive.thrift; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; + import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Map; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -40,8 +43,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Client; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -64,8 +65,6 @@ import org.apache.thrift.transport.TTran import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; - /** * Functions that bridge Thrift's SASL transports to Hadoop's * SASL callback handlers and authentication classes. @@ -91,6 +90,19 @@ import static org.apache.hadoop.fs.Commo return new Server(keytabFile, principalConf); } + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * @param conf + * @return Hadoop SASL configuration + */ + @Override + public Map getHadoopSaslProperties(Configuration conf) { + // Initialize the SaslRpcServer to ensure QOP parameters are read from conf + SaslRpcServer.init(conf); + return SaslRpcServer.SASL_PROPS; + } + public static class Client extends HadoopThriftAuthBridge.Client { /** * Create a client-side SASL transport that wraps an underlying transport. @@ -99,13 +111,14 @@ import static org.apache.hadoop.fs.Commo * supported. * @param serverPrincipal The Kerberos principal of the target server. * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + * @param saslProps the sasl properties to create the client with */ @Override public TTransport createClientTransport( String principalConfig, String host, - String methodStr, String tokenStrForm, TTransport underlyingTransport) - throws IOException { + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map saslProps) throws IOException { AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); TTransport saslTransport = null; @@ -117,7 +130,7 @@ import static org.apache.hadoop.fs.Commo method.getMechanismName(), null, null, SaslRpcServer.SASL_DEFAULT_REALM, - SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t), + saslProps, new SaslClientCallbackHandler(t), underlyingTransport); return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); @@ -134,7 +147,7 @@ import static org.apache.hadoop.fs.Commo method.getMechanismName(), null, names[0], names[1], - SaslRpcServer.SASL_PROPS, null, + saslProps, null, underlyingTransport); return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); } catch (SaslException se) { @@ -142,7 +155,7 @@ import static org.apache.hadoop.fs.Commo } default: - throw new IOException("Unsupported authentication method: " + method); + throw new IOException("Unsupported authentication method: " + method); } } private static class SaslClientCallbackHandler implements CallbackHandler { @@ -273,10 +286,11 @@ import static org.apache.hadoop.fs.Commo * can be passed as both the input and output transport factory when * instantiating a TThreadPoolServer, for example. * + * @param saslProps Map of SASL properties */ @Override - public TTransportFactory createTransportFactory() throws TTransportException - { + public TTransportFactory createTransportFactory(Map saslProps) + throws TTransportException { // Parse out the kerberos principal, host, realm. String kerberosName = realUgi.getUserName(); final String names[] = SaslRpcServer.splitKerberosName(kerberosName); @@ -288,11 +302,11 @@ import static org.apache.hadoop.fs.Commo transFactory.addServerDefinition( AuthMethod.KERBEROS.getMechanismName(), names[0], names[1], // two parts of kerberos principal - SaslRpcServer.SASL_PROPS, + saslProps, new SaslRpcServer.SaslGssCallbackHandler()); transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM, - SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager)); + saslProps, new SaslDigestCallbackHandler(secretManager)); return new TUGIAssumingTransportFactory(transFactory, realUgi); } @@ -359,7 +373,9 @@ import static org.apache.hadoop.fs.Commo throws IOException, InterruptedException { if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication"); + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); } //if the user asking the token is same as the 'owner' then don't do //any proxy authorization checks. For cases like oozie, where it gets @@ -388,7 +404,9 @@ import static org.apache.hadoop.fs.Commo public long renewDelegationToken(String tokenStrForm) throws IOException { if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication"); + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); } return secretManager.renewDelegationToken(tokenStrForm); } @@ -430,7 +448,7 @@ import static org.apache.hadoop.fs.Commo public String getRemoteUser() { return remoteUser.get(); } - + /** CallbackHandler for SASL DIGEST-MD5 mechanism */ // This code is pretty much completely based on Hadoop's // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not Modified: hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original) +++ hive/branches/vectorization/shims/src/common-secure/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Fri Aug 16 01:21:54 2013 @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.List; +import java.util.Map; import junit.framework.TestCase; @@ -72,20 +73,18 @@ public class TestHadoop20SAuthBridge ext return new Server(); } - - static class Server extends HadoopThriftAuthBridge20S.Server { public Server() throws TTransportException { super(); } @Override - public TTransportFactory createTransportFactory() + public TTransportFactory createTransportFactory(Map saslProps) throws TTransportException { TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM, - SaslRpcServer.SASL_PROPS, + saslProps, new SaslDigestCallbackHandler(secretManager)); return new TUGIAssumingTransportFactory(transFactory, realUgi); @@ -98,9 +97,9 @@ public class TestHadoop20SAuthBridge ext } @Override - public void startDelegationTokenSecretManager(Configuration conf) + public void startDelegationTokenSecretManager(Configuration conf, Object hms) throws IOException{ - super.startDelegationTokenSecretManager(conf); + super.startDelegationTokenSecretManager(conf, hms); isMetastoreTokenManagerInited = true; } Modified: hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Aug 16 01:21:54 2013 @@ -197,16 +197,18 @@ public interface HadoopShims { */ public String unquoteHtmlChars(String item); + + + public void closeAllForUGI(UserGroupInformation ugi); + /** * Get the UGI that the given job configuration will run as. * * In secure versions of Hadoop, this simply returns the current * access control context's user, ignoring the configuration. */ - - public void closeAllForUGI(UserGroupInformation ugi); - public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException; + /** * Used by metastore server to perform requested rpc in client context. * @param @@ -219,6 +221,26 @@ public interface HadoopShims { IOException, InterruptedException; /** + * Once a delegation token is stored in a file, the location is specified + * for a child process that runs hadoop operations, using an environment + * variable . + * @return Return the name of environment variable used by hadoop to find + * location of token file + */ + public String getTokenFileLocEnvName(); + + + /** + * Get delegation token from filesystem and write the token along with + * metastore tokens into a file + * @param conf + * @return Path of the file with token credential + * @throws IOException + */ + public Path createDelegationTokenFile(final Configuration conf) throws IOException; + + + /** * Used by metastore server to creates UGI object for a remote user. * @param userName remote User Name * @param groupNames group names associated with remote user name @@ -321,13 +343,20 @@ public interface HadoopShims { public String getJobLauncherHttpAddress(Configuration conf); - /** - * Perform kerberos login using the given principal and keytab - * @throws IOException - */ + /** + * Perform kerberos login using the given principal and keytab + * @throws IOException + */ public void loginUserFromKeytab(String principal, String keytabFile) throws IOException; /** + * Perform kerberos re-login using the given principal and keytab, to renew + * the credentials + * @throws IOException + */ + public void reLoginUserFromKeytab() throws IOException; + + /** * Move the directory/file to trash. In case of the symlinks or mount points, the file is * moved to the trashbin in the actual volume of the path p being deleted * @param fs @@ -365,6 +394,13 @@ public interface HadoopShims { UserGroupInformation createProxyUser(String userName) throws IOException; /** + * The method sets to set the partition file has a different signature between + * hadoop versions. + * @param jobConf + * @param partition + */ + void setTotalOrderPartitionFile(JobConf jobConf, Path partition); + /** * InputSplitShim. * */ Modified: hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original) +++ hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Fri Aug 16 01:21:54 2013 @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetAddress; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TProcessor; @@ -50,6 +51,18 @@ import org.apache.thrift.transport.TTran } + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * + * @param conf + * @return Hadoop SASL configuration + */ + public Map getHadoopSaslProperties(Configuration conf) { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + public static abstract class Client { /** * @@ -65,13 +78,14 @@ import org.apache.thrift.transport.TTran * @throws IOException */ public abstract TTransport createClientTransport( - String principalConfig, String host, - String methodStr,String tokenStrForm, TTransport underlyingTransport) - throws IOException; + String principalConfig, String host, + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map saslProps) + throws IOException; } public static abstract class Server { - public abstract TTransportFactory createTransportFactory() throws TTransportException; + public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException; public abstract TProcessor wrapProcessor(TProcessor processor); public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); public abstract InetAddress getRemoteAddress(); Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/client/PTestClient.java Fri Aug 16 01:21:54 2013 @@ -104,7 +104,7 @@ public class PTestClient { } public boolean testStart(String profile, String testHandle, String jira, String patch, String testOutputDir, boolean clearLibraryCache) - throws Exception { + throws Exception { patch = Strings.nullToEmpty(patch).trim(); if(!patch.isEmpty()) { byte[] bytes = Resources.toByteArray(new URL(patch)); @@ -126,7 +126,7 @@ public class PTestClient { return result; } public boolean testList() - throws Exception { + throws Exception { TestListRequest testListRequest = new TestListRequest(); TestListResponse testListResponse = post(testListRequest); for(TestStatus testStatus : testListResponse.getEntries()) { @@ -135,7 +135,7 @@ public class PTestClient { return true; } public boolean testTailLog(String testHandle) - throws Exception { + throws Exception { testHandle = Strings.nullToEmpty(testHandle).trim(); if(testHandle.isEmpty()) { throw new IllegalArgumentException("TestHandle is required"); @@ -163,7 +163,7 @@ public class PTestClient { return Status.isOK(statusResponse.getTestStatus().getStatus()); } private void downloadTestResults(String testHandle, String testOutputDir) - throws Exception { + throws Exception { HttpGet request = new HttpGet(mLogsEndpoint + testHandle + "/test-results.tar.gz"); FileOutputStream output = null; try { @@ -183,14 +183,14 @@ public class PTestClient { } } private long printLogs(String testHandle, long offset) - throws Exception { + throws Exception { TestLogRequest logsRequest = new TestLogRequest(testHandle, offset, 64 * 1024); TestLogResponse logsResponse = post(logsRequest); System.out.print(logsResponse.getBody()); return logsResponse.getOffset(); } private S post(Object payload) - throws Exception { + throws Exception { EndPointResponsePair endPointResponse = Preconditions. checkNotNull(REQUEST_TO_ENDPOINT.get(payload.getClass()), payload.getClass().getName()); HttpPost request = new HttpPost(mApiEndPoint + endPointResponse.getEndpoint()); @@ -207,7 +207,7 @@ public class PTestClient { String response = EntityUtils.toString(httpResponse.getEntity(), "UTF-8"); @SuppressWarnings("unchecked") S result = (S)endPointResponse. - getResponseClass().cast(mMapper.readValue(response, endPointResponse.getResponseClass())); + getResponseClass().cast(mMapper.readValue(response, endPointResponse.getResponseClass())); Status.assertOK(result.getStatus()); if(System.getProperty("DEBUG_PTEST_CLIENT") != null) { System.err.println("payload " + payloadString); @@ -242,7 +242,7 @@ public class PTestClient { for(String requiredOption : requiredOptions) { if(!commandLine.hasOption(requiredOption)) { throw new IllegalArgumentException(requiredOption + " is required"); - } + } } } public static void main(String[] args) throws Exception { @@ -258,7 +258,7 @@ public class PTestClient { options.addOption(null, TEST_HANDLE, true, "Server supplied test handle. (Required for testStop and testTailLog)"); options.addOption(null, OUTPUT_DIR, true, "Directory to download and save test-results.tar.gz to. (Optional for testStart)"); options.addOption(null, CLEAR_LIBRARY_CACHE, false, "Before starting the test, delete the ivy and maven directories (Optional for testStart)"); - + CommandLine commandLine = parser.parse(options, args); if(commandLine.hasOption(HELP_SHORT)) { @@ -266,9 +266,9 @@ public class PTestClient { System.exit(0); } assertRequired(commandLine, new String[] { - COMMAND, - PASSWORD, - ENDPOINT + COMMAND, + PASSWORD, + ENDPOINT }); PTestClient client = new PTestClient(commandLine.getOptionValue(ENDPOINT), commandLine.getOptionValue(PASSWORD)); @@ -278,7 +278,7 @@ public class PTestClient { assertRequired(commandLine, new String[] { PROFILE, TEST_HANDLE - }); + }); result = client.testStart(commandLine.getOptionValue(PROFILE), commandLine.getOptionValue(TEST_HANDLE), commandLine.getOptionValue(JIRA), commandLine.getOptionValue(PATCH), commandLine.getOptionValue(OUTPUT_DIR), commandLine.hasOption(CLEAR_LIBRARY_CACHE)); Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/request/TestStartRequest.java Fri Aug 16 01:21:54 2013 @@ -28,7 +28,7 @@ public class TestStartRequest { public TestStartRequest() { } - public TestStartRequest(String profile, String testHandle, + public TestStartRequest(String profile, String testHandle, String jiraName, String patchURL, boolean clearLibraryCache) { this.profile = profile; this.testHandle = testHandle; @@ -47,7 +47,7 @@ public class TestStartRequest { } public void setPatchURL(String patchURL) { this.patchURL = patchURL; - } + } public boolean isClearLibraryCache() { return clearLibraryCache; } @@ -60,7 +60,7 @@ public class TestStartRequest { public void setJiraName(String jiraName) { this.jiraName = jiraName; } - + public String getTestHandle() { return testHandle; } Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/ExecutionController.java Fri Aug 16 01:21:54 2013 @@ -99,7 +99,8 @@ public class ExecutionController { mTestExecutor.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override - public void run() { + public void run() { + LOG.info("Shutdown hook called"); try { mTestExecutor.shutdown(); } catch (Exception e) { @@ -131,7 +132,7 @@ public class ExecutionController { return new TestStartResponse(Status.illegalArgument()); } if(!assertTestHandleIsAvailable(startRequest.getTestHandle())) { - return new TestStartResponse(Status.illegalArgument("Test handle " + startRequest.getTestHandle() + " already used")); + return new TestStartResponse(Status.illegalArgument("Test handle " + startRequest.getTestHandle() + " already used")); } Test test = new Test(startRequest, Status.pending(), System.currentTimeMillis()); @@ -230,4 +231,4 @@ public class ExecutionController { Preconditions.checkState(!testOutputDir.isFile(), "Output directory " + testOutputDir + " is file"); return testOutputDir.mkdir(); } -} \ No newline at end of file +} Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java Fri Aug 16 01:21:54 2013 @@ -66,7 +66,7 @@ public class TestExecutor extends Thread } @Override -public void run() { + public void run() { while(execute) { Test test = null; PrintStream logStream = null; Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/CleanupPhase.java Fri Aug 16 01:21:54 2013 @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hive.ptest.execution; - -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class CleanupPhase extends Phase { - - public CleanupPhase(ImmutableList hostExecutors, - LocalCommandFactory localCommandFactory, - ImmutableMap templateDefaults, Logger logger) { - super(hostExecutors, localCommandFactory, templateDefaults, logger); - } - @Override -public void execute() throws Exception { - execHosts("killall -q -9 -f java || true"); - TimeUnit.SECONDS.sleep(1); - execLocally("rm -rf $workingDir/scratch"); - execInstances("rm -rf $localDir/$instanceName/scratch $localDir/$instanceName/logs"); - } -} \ No newline at end of file Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Constants.java Fri Aug 16 01:21:54 2013 @@ -21,6 +21,7 @@ package org.apache.hive.ptest.execution; public class Constants { + public static final int EXIT_CODE_EXCEPTION = -1; public static final int EXIT_CODE_SUCCESS = 0; public static final int EXIT_CODE_UNKNOWN = 255; } Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Drone.java Fri Aug 16 01:21:54 2013 @@ -65,5 +65,4 @@ public class Drone { public int getInstance() { return instance; } - } Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java Fri Aug 16 01:21:54 2013 @@ -28,19 +28,23 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.hive.ptest.execution.conf.Host; import org.apache.hive.ptest.execution.conf.TestBatch; +import org.apache.hive.ptest.execution.context.ExecutionContext; import org.slf4j.Logger; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; public class ExecutionPhase extends Phase { - + private static final long FOUR_HOURS = 4L* 60L * 60L * 1000L; + private final ExecutionContext executionContext; + private final HostExecutorBuilder hostExecutorBuilder; private final File succeededLogDir; private final File failedLogDir; private final BlockingQueue parallelWorkQueue; @@ -50,13 +54,16 @@ public class ExecutionPhase extends Phas private final Supplier> testBatchSupplier; private final Set failedTestResults; - public ExecutionPhase(ImmutableList hostExecutors, + public ExecutionPhase(List hostExecutors, ExecutionContext executionContext, + HostExecutorBuilder hostExecutorBuilder, LocalCommandFactory localCommandFactory, ImmutableMap templateDefaults, File succeededLogDir, File failedLogDir, Supplier> testBatchSupplier, Set executedTests, Set failedTests, Logger logger) throws IOException { super(hostExecutors, localCommandFactory, templateDefaults, logger); + this.executionContext = executionContext; + this.hostExecutorBuilder = hostExecutorBuilder; this.succeededLogDir = succeededLogDir; this.failedLogDir = failedLogDir; this.testBatchSupplier = testBatchSupplier; @@ -68,7 +75,7 @@ public class ExecutionPhase extends Phas synchronizedSet(new HashSet()); } @Override -public void execute() throws Throwable { + public void execute() throws Throwable { long start = System.currentTimeMillis(); List testBatches = Lists.newArrayList(); for(TestBatch batch : testBatchSupplier.get()) { @@ -80,37 +87,26 @@ public void execute() throws Throwable { } } try { + int expectedNumHosts = hostExecutors.size(); + initalizeHosts(); do { - float numberBadHosts = 0f; - for(HostExecutor hostExecutor : hostExecutors) { - if(hostExecutor.remainingDrones() == 0) { - numberBadHosts++; - } - } - Preconditions.checkState(hostExecutors.size() > 0, "Host executors cannot be empty"); - float percentBadHosts = numberBadHosts / (float)hostExecutors.size(); - if(percentBadHosts > 0.50f) { - throw new IllegalStateException("Too many bad hosts: " + percentBadHosts + "% (" + (int)numberBadHosts + - " / " + hostExecutors.size() + ") is greater than threshold of 50%"); - } + replaceBadHosts(expectedNumHosts); List> results = Lists.newArrayList(); - for(HostExecutor hostExecutor : getHostExecutors()) { + for(HostExecutor hostExecutor : ImmutableList.copyOf(hostExecutors)) { results.add(hostExecutor.submitTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults)); } Futures.allAsList(results).get(); } while(!(parallelWorkQueue.isEmpty() && isolatedWorkQueue.isEmpty())); - Preconditions.checkState(parallelWorkQueue.isEmpty(), "Parallel work queue is not empty. All drones must have aborted."); - Preconditions.checkState(isolatedWorkQueue.isEmpty(), "Isolated work queue is not empty. All drones must have aborted."); for(TestBatch batch : testBatches) { - File batchLogDir; - if(failedTestResults.contains(batch)) { - batchLogDir = new File(failedLogDir, batch.getName()); - } else { - batchLogDir = new File(succeededLogDir, batch.getName()); - } - JUnitReportParser parser = new JUnitReportParser(logger, batchLogDir); - executedTests.addAll(parser.getExecutedTests()); - failedTests.addAll(parser.getFailedTests()); + File batchLogDir; + if(failedTestResults.contains(batch)) { + batchLogDir = new File(failedLogDir, batch.getName()); + } else { + batchLogDir = new File(succeededLogDir, batch.getName()); + } + JUnitReportParser parser = new JUnitReportParser(logger, batchLogDir); + executedTests.addAll(parser.getExecutedTests()); + failedTests.addAll(parser.getFailedTests()); } } finally { long elapsed = System.currentTimeMillis() - start; @@ -118,5 +114,40 @@ public void execute() throws Throwable { TimeUnit.MINUTES.convert(elapsed, TimeUnit.MILLISECONDS) + " minutes"); } } - + private void replaceBadHosts(int expectedNumHosts) + throws Exception { + Set goodHosts = Sets.newHashSet(); + for(HostExecutor hostExecutor : ImmutableList.copyOf(hostExecutors)) { + if(hostExecutor.isBad()) { + logger.info("Removing host during execution phase: " + hostExecutor.getHost()); + executionContext.addBadHost(hostExecutor.getHost()); + hostExecutors.remove(hostExecutor); + } else { + goodHosts.add(hostExecutor.getHost()); + } + } + long start = System.currentTimeMillis(); + while(hostExecutors.size() < expectedNumHosts) { + if(System.currentTimeMillis() - start > FOUR_HOURS) { + throw new RuntimeException("Waited over fours for hosts, still have only " + + hostExecutors.size() + " hosts out of an expected " + expectedNumHosts); + } + logger.warn("Only " + hostExecutors.size() + " hosts out of an expected " + expectedNumHosts + + ", attempting to replace bad hosts"); + TimeUnit.MINUTES.sleep(1); + executionContext.replaceBadHosts(); + for(Host host : executionContext.getHosts()) { + if(!goodHosts.contains(host)) { + HostExecutor hostExecutor = hostExecutorBuilder.build(host); + initalizeHost(hostExecutor); + if(hostExecutor.isBad()) { + executionContext.addBadHost(hostExecutor.getHost()); + } else { + logger.info("Adding new host during execution phase: " + host); + hostExecutors.add(hostExecutor); + } + } + } + } + } } Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java Fri Aug 16 01:21:54 2013 @@ -29,6 +29,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.hive.ptest.execution.conf.Host; import org.apache.hive.ptest.execution.conf.TestBatch; import org.apache.hive.ptest.execution.ssh.RSyncCommand; @@ -41,16 +42,19 @@ import org.apache.hive.ptest.execution.s import org.apache.hive.ptest.execution.ssh.SSHResult; import org.slf4j.Logger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.Files; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; class HostExecutor { + private static final int MAX_SOURCE_DIRS = 5; private final Host mHost; private final List mDrones; private final ListeningExecutorService mExecutor; @@ -102,10 +106,13 @@ class HostExecutor { }); } - + @VisibleForTesting int remainingDrones() { return mDrones.size(); } + boolean isBad() { + return mDrones.isEmpty(); + } Host getHost() { return mHost; } @@ -130,7 +137,7 @@ class HostExecutor { do { batch = parallelWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS); if(batch != null) { - if(!executeTestBatch(drone, batch, failedTestResults.size())) { + if(!executeTestBatch(drone, batch, failedTestResults)) { failedTestResults.add(batch); } } @@ -155,7 +162,7 @@ class HostExecutor { do { batch = isolatedWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS); if(batch != null) { - if(!executeTestBatch(drone, batch, failedTestResults.size())) { + if(!executeTestBatch(drone, batch, failedTestResults)) { failedTestResults.add(batch); } } @@ -174,7 +181,7 @@ class HostExecutor { * Executes the test batch on the drone in question. If the command * exits with a status code of 255 throw an AbortDroneException. */ - private boolean executeTestBatch(Drone drone, TestBatch batch, int numOfFailedTests) + private boolean executeTestBatch(Drone drone, TestBatch batch, Set failedTestResults) throws IOException, SSHExecutionException, AbortDroneException { String scriptName = "hiveptest-" + batch.getName() + ".sh"; File script = new File(mLocalScratchDirectory, scriptName); @@ -184,7 +191,8 @@ class HostExecutor { templateVariables.put("testArguments", batch.getTestArguments()); templateVariables.put("localDir", drone.getLocalDirectory()); templateVariables.put("logDir", drone.getLocalLogDirectory()); - templateVariables.put("numOfFailedTests", String.valueOf(numOfFailedTests)); + templateVariables.put("maxSourceDirs", String.valueOf(MAX_SOURCE_DIRS)); + templateVariables.put("numOfFailedTests", String.valueOf(failedTestResults.size())); String command = Templates.getTemplateResult("bash $localDir/$instanceName/scratch/" + script.getName(), templateVariables); Templates.writeTemplateResult("batch-exec.vm", script, templateVariables); @@ -193,7 +201,7 @@ class HostExecutor { mLogger.info(drone + " executing " + batch + " with " + command); RemoteCommandResult sshResult = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(), drone.getHost(), drone.getInstance(), command). - call(); + call(); File batchLogDir = null; if(sshResult.getExitCode() == Constants.EXIT_CODE_UNKNOWN) { throw new AbortDroneException("Drone " + drone.toString() + " exited with " + @@ -209,6 +217,13 @@ class HostExecutor { } copyFromDroneToLocal(drone, batchLogDir.getAbsolutePath(), drone.getLocalLogDirectory() + "/"); + if(failedTestResults.size() > MAX_SOURCE_DIRS) { + File sourceDir = new File(batchLogDir, "source"); + if(sourceDir.isDirectory()) { + mLogger.info("Max source directories exceeded, deleting " + sourceDir.getAbsolutePath() + + ":" + FileUtils.deleteQuietly(sourceDir)); + } + } File logFile = new File(batchLogDir, String.format("%s.txt", batch.getName())); PrintWriter writer = new PrintWriter(logFile); writer.write(String.format("result = '%s'\n", sshResult.toString())); @@ -247,34 +262,54 @@ class HostExecutor { * they will be removed from use possibly leaving this host with zero * functioning drones. */ - List> rsyncFromLocalToRemoteInstances(final String localFile, final String remoteFile) + ListenableFuture>> rsyncFromLocalToRemoteInstances(final String localFile, final String remoteFile) throws InterruptedException, IOException { - List> result = Lists.newArrayList(); - for(final Drone drone : ImmutableList.copyOf(mDrones)) { - final Map templateVariables = Maps.newHashMap(mTemplateDefaults); - templateVariables.put("instanceName", drone.getInstanceName()); - templateVariables.put("localDir", drone.getLocalDirectory()); - result.add(mExecutor.submit(new Callable() { - @Override - public RSyncResult call() throws Exception { + // the basic premise here is that we will rsync the directory to first working drone + // then execute a local rsync on the node to the other drones. This keeps + // us from executing tons of rsyncs on the master node conserving CPU + return mExecutor.submit(new Callable>>() { + @Override + public List> call() + throws Exception { + List drones = Lists.newArrayList(mDrones); + List> results = Lists.newArrayList(); + // local path doesn't depend on drone variables + String resolvedLocalLocation = Files.simplifyPath(Templates.getTemplateResult(localFile, mTemplateDefaults)); + String remoteStagingLocation = null; + for(final Drone drone : ImmutableList.copyOf(mDrones)) { + Preconditions.checkState(remoteStagingLocation == null, "Remote staging location must be null at the start of the loop"); + final Map templateVariables = Maps.newHashMap(mTemplateDefaults); + templateVariables.put("instanceName", drone.getInstanceName()); + templateVariables.put("localDir", drone.getLocalDirectory()); + String resolvedRemoteLocation = Files.simplifyPath(Templates.getTemplateResult(remoteFile, templateVariables)); RSyncResult result = new RSyncCommand(mRSyncCommandExecutor, drone.getPrivateKey(), drone.getUser(), drone.getHost(), drone.getInstance(), - Templates.getTemplateResult(localFile, templateVariables), - Templates.getTemplateResult(remoteFile, templateVariables), + resolvedLocalLocation, + resolvedRemoteLocation, RSyncCommand.Type.FROM_LOCAL).call(); - if(result.getExitCode() != Constants.EXIT_CODE_SUCCESS) { + if(result.getExitCode() == Constants.EXIT_CODE_SUCCESS) { + remoteStagingLocation = resolvedRemoteLocation; + drones.remove(drone); + mLogger.info("Successfully staged " + resolvedLocalLocation + " on " + remoteStagingLocation); + break; + } else { mDrones.remove(drone); mLogger.error("Aborting drone during rsync", new AbortDroneException("Drone " + drone + " exited with " + result.getExitCode() + ": " + result)); - return null; - } else { - return result; } } - })); - } - return result; + if(remoteStagingLocation == null) { + Preconditions.checkState(mDrones.isEmpty(), "If remote staging location is not set all drones should be bad"); + mLogger.warn("Unable to stage directory on remote host, all drones must be bad"); + } else { + String name = (new File(resolvedLocalLocation)).getName(); + remoteStagingLocation = Files.simplifyPath(remoteStagingLocation + "/" + name); + results.addAll(execInstances(drones, "rsync -qaPe --delete --delete-during --force " + remoteStagingLocation + " " + remoteFile)); + } + return results; + } + }); } RSyncResult copyFromDroneToLocal(Drone drone, String localFile, String remoteFile) throws SSHExecutionException, IOException { @@ -299,47 +334,51 @@ class HostExecutor { ListenableFuture exec(final String cmd) throws Exception { return mExecutor.submit(new Callable() { - @Override - public SSHResult call() throws Exception { - for(final Drone drone : ImmutableList.copyOf(mDrones)) { - Map templateVariables = Maps.newHashMap(mTemplateDefaults); - templateVariables.put("instanceName", drone.getInstanceName()); - templateVariables.put("localDir", drone.getLocalDirectory()); - String command = Templates.getTemplateResult(cmd, templateVariables); - SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(), - drone.getHost(), drone.getInstance(), command).call(); - if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) { - mDrones.remove(drone); // return value not checked due to concurrent access - mLogger.error("Aborting drone during exec " + command, - new AbortDroneException("Drone " + drone + " exited with " - + Constants.EXIT_CODE_UNKNOWN + ": " + result)); - } else { - return result; - } + @Override + public SSHResult call() throws Exception { + for(final Drone drone : ImmutableList.copyOf(mDrones)) { + Map templateVariables = Maps.newHashMap(mTemplateDefaults); + templateVariables.put("instanceName", drone.getInstanceName()); + templateVariables.put("localDir", drone.getLocalDirectory()); + String command = Templates.getTemplateResult(cmd, templateVariables); + SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(), + drone.getHost(), drone.getInstance(), command).call(); + if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) { + mDrones.remove(drone); // return value not checked due to concurrent access + mLogger.error("Aborting drone during exec " + command, + new AbortDroneException("Drone " + drone + " exited with " + + Constants.EXIT_CODE_UNKNOWN + ": " + result)); + } else { + return result; } - return null; } + return null; + } }); } - List> execInstances(final String cmd) - throws SSHExecutionException, InterruptedException, IOException { - List> result = Lists.newArrayList(); - for(final Drone drone : ImmutableList.copyOf(mDrones)) { - result.add(mExecutor.submit(new Callable() { + List> execInstances(final String cmd) + throws InterruptedException, IOException { + return execInstances(mDrones, cmd); + } + private List> execInstances(List drones, final String cmd) + throws InterruptedException, IOException { + List> result = Lists.newArrayList(); + for(final Drone drone : ImmutableList.copyOf(drones)) { + result.add(mExecutor.submit(new Callable() { @Override - public SSHResult call() throws Exception { + public RemoteCommandResult call() throws Exception { Map templateVariables = Maps.newHashMap(mTemplateDefaults); templateVariables.put("instanceName", drone.getInstanceName()); templateVariables.put("localDir", drone.getLocalDirectory()); String command = Templates.getTemplateResult(cmd, templateVariables); SSHResult result = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(), drone.getHost(), drone.getInstance(), command).call(); - if(result.getExitCode() == Constants.EXIT_CODE_UNKNOWN) { + if(result.getExitCode() != Constants.EXIT_CODE_SUCCESS) { mDrones.remove(drone); // return value not checked due to concurrent access mLogger.error("Aborting drone during exec " + command, new AbortDroneException("Drone " + drone + " exited with " - + Constants.EXIT_CODE_UNKNOWN + ": " + result)); + + result.getExitCode() + ": " + result)); return null; } else { return result; Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java Fri Aug 16 01:21:54 2013 @@ -20,9 +20,10 @@ package org.apache.hive.ptest.execution; import java.io.IOException; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.SortedSet; import org.apache.hive.ptest.api.server.TestLogger; import org.apache.hive.ptest.execution.conf.Context; @@ -77,9 +78,9 @@ class JIRAService { mJenkinsURL = configuration.getJenkinsURL(); } - void postComment(boolean error, int numExecutesTests, Set failedTests, - List messages) { - DefaultHttpClient httpClient = new DefaultHttpClient(); + void postComment(boolean error, int numExecutesTests, SortedSet failedTests, + List messages) { + DefaultHttpClient httpClient = new DefaultHttpClient(); try { String buildTag = formatBuildTag(mBuildTag); List comments = Lists.newArrayList(); @@ -111,7 +112,7 @@ class JIRAService { comments.addAll(failedTests); comments.add("{noformat}"); } - comments.add(""); + comments.add(""); } comments.add("Test results: " + mJenkinsURL + "/" + buildTag + "/testReport"); comments.add("Console output: " + mJenkinsURL + "/" + buildTag + "/console"); @@ -121,21 +122,21 @@ class JIRAService { comments.add("{noformat}"); comments.addAll(messages); comments.add("{noformat}"); - comments.add(""); + comments.add(""); } comments.add("This message is automatically generated."); - mLogger.info("Comment: " + Joiner.on("\n").join(comments)); + mLogger.info("Comment: " + Joiner.on("\n").join(comments)); String body = Joiner.on("\n").join(comments); String url = String.format("%s/rest/api/2/issue/%s/comment", mUrl, mName); URL apiURL = new URL(mUrl); httpClient.getCredentialsProvider() - .setCredentials( - new AuthScope(apiURL.getHost(), apiURL.getPort(), - AuthScope.ANY_REALM), + .setCredentials( + new AuthScope(apiURL.getHost(), apiURL.getPort(), + AuthScope.ANY_REALM), new UsernamePasswordCredentials(mUser, mPassword)); BasicHttpContext localcontext = new BasicHttpContext(); localcontext.setAttribute("preemptive-auth", new BasicScheme()); - httpClient.addRequestInterceptor(new PreemptiveAuth(), 0); + httpClient.addRequestInterceptor(new PreemptiveAuth(), 0); HttpPost request = new HttpPost(url); ObjectMapper mapper = new ObjectMapper(); StringEntity params = new StringEntity(mapper.writeValueAsString(new Body(body))); @@ -155,12 +156,12 @@ class JIRAService { httpClient.getConnectionManager().shutdown(); } } - - @SuppressWarnings("unused") + + @SuppressWarnings("unused") private static class Body { private String body; public Body() { - + } public Body(String body) { this.body = body; @@ -170,9 +171,9 @@ class JIRAService { } public void setBody(String body) { this.body = body; - } + } } - + /** * Hive-Build-123 to Hive-Build/123 */ @@ -198,7 +199,7 @@ class JIRAService { public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException { - AuthState authState = (AuthState) context.getAttribute(ClientContext.TARGET_AUTH_STATE); + AuthState authState = (AuthState) context.getAttribute(ClientContext.TARGET_AUTH_STATE); if (authState.getAuthScheme() == null) { AuthScheme authScheme = (AuthScheme) context.getAttribute("preemptive-auth"); CredentialsProvider credsProvider = (CredentialsProvider) context.getAttribute(ClientContext.CREDS_PROVIDER); @@ -215,7 +216,7 @@ class JIRAService { } } } - + public static void main(String[] args) throws Exception { TestLogger logger = new TestLogger(System.err, TestLogger.LEVEL.TRACE); Map context = Maps.newHashMap(); @@ -230,7 +231,7 @@ class JIRAService { configuration.setJiraName("HIVE-4892"); JIRAService service = new JIRAService(logger, configuration, "test-123"); List messages = Lists.newArrayList("msg1", "msg2"); - Set failedTests = Sets.newHashSet("failed"); + SortedSet failedTests = Sets.newTreeSet(Collections.singleton("failed")); service.postComment(false, 5, failedTests, messages); } -} +} \ No newline at end of file Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JUnitReportParser.java Fri Aug 16 01:21:54 2013 @@ -87,21 +87,25 @@ public class JUnitReportParser { private String name; private boolean failedOrErrored; @Override - public void startElement(String uri, String localName, String qName, Attributes attributes) { + public void startElement(String uri, String localName, String qName, Attributes attributes) { if ("testcase".equals(qName)) { name = attributes.getValue("classname"); failedOrErrored = false; - if(name == null) { + if(name == null || "junit.framework.TestSuite".equals(name)) { name = attributes.getValue("name"); } else { name = name + "." + attributes.getValue("name"); } - } else if (name != null && ("failure".equals(qName) || "error".equals(qName))) { - failedOrErrored = true; + } else if (name != null) { + if ("failure".equals(qName) || "error".equals(qName)) { + failedOrErrored = true; + } else if("skipped".equals(qName)) { + name = null; + } } } @Override - public void endElement(String uri, String localName, String qName) { + public void endElement(String uri, String localName, String qName) { if ("testcase".equals(qName)) { if(name != null) { executedTests.add(name); @@ -125,4 +129,4 @@ public class JUnitReportParser { } } } -} +} \ No newline at end of file Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LogDirectoryCleaner.java Fri Aug 16 01:21:54 2013 @@ -41,7 +41,7 @@ public class LogDirectoryCleaner extends } @Override -public void run() { + public void run() { try { File[] logDirs = mLogDir.listFiles(); if(logDirs != null && logDirs.length > 0) { @@ -81,10 +81,10 @@ public void run() { File getOldest() { Preconditions.checkState(!dirs.isEmpty(), "Cannot be called unless dirs.size() >= 1"); File eldestDir = null; - int eldestId = Integer.MAX_VALUE; + long eldestId = Long.MAX_VALUE; for(File dir : dirs) { try { - int id = Integer.parseInt(dir.getName().substring(name.length() + 1)); + long id = Long.parseLong(dir.getName().substring(name.length() + 1)); if(id < eldestId) { eldestId = id; eldestDir = dir; Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java Fri Aug 16 01:21:54 2013 @@ -20,12 +20,16 @@ package org.apache.hive.ptest.execution; import java.io.File; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -47,7 +51,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -72,13 +75,13 @@ public class PTest { private final List mPhases; private final ExecutionContext mExecutionContext; private final Logger mLogger; - private final ImmutableList mHostExecutors; + private final List mHostExecutors; private final String mBuildTag; - public PTest(final TestConfiguration configuration, ExecutionContext executionContext, - String buildTag, File logDir, LocalCommandFactory localCommandFactory, SSHCommandExecutor sshCommandExecutor, - RSyncCommandExecutor rsyncCommandExecutor, Logger logger) - throws Exception { + public PTest(final TestConfiguration configuration, final ExecutionContext executionContext, + final String buildTag, final File logDir, final LocalCommandFactory localCommandFactory, + final SSHCommandExecutor sshCommandExecutor, final RSyncCommandExecutor rsyncCommandExecutor, + final Logger logger) throws Exception { mConfiguration = configuration; mLogger = logger; mBuildTag = buildTag; @@ -86,9 +89,9 @@ public class PTest { mFailedTests = Collections.newSetFromMap(new ConcurrentHashMap()); mExecutionContext = executionContext; mExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); - File failedLogDir = Dirs.create(new File(logDir, "failed")); - File succeededLogDir = Dirs.create(new File(logDir, "succeeded")); - File scratchDir = Dirs.createEmpty(new File(mExecutionContext.getLocalWorkingDirectory(), "scratch")); + final File failedLogDir = Dirs.create(new File(logDir, "failed")); + final File succeededLogDir = Dirs.create(new File(logDir, "succeeded")); + final File scratchDir = Dirs.createEmpty(new File(mExecutionContext.getLocalWorkingDirectory(), "scratch")); File patchDir = Dirs.createEmpty(new File(logDir, "patches")); File patchFile = null; if(!configuration.getPatch().isEmpty()) { @@ -97,32 +100,38 @@ public class PTest { } ImmutableMap.Builder templateDefaultsBuilder = ImmutableMap.builder(); templateDefaultsBuilder. - put("repository", configuration.getRepository()). - put("repositoryName", configuration.getRepositoryName()). - put("repositoryType", configuration.getRepositoryType()). - put("branch", configuration.getBranch()). - put("clearLibraryCache", String.valueOf(configuration.isClearLibraryCache())). - put("workingDir", mExecutionContext.getLocalWorkingDirectory()). - put("antArgs", configuration.getAntArgs()). - put("buildTag", buildTag). - put("logDir", logDir.getAbsolutePath()). - put("javaHome", configuration.getJavaHome()). - put("antEnvOpts", configuration.getAntEnvOpts()); - ImmutableMap templateDefaults = templateDefaultsBuilder.build(); + put("repository", configuration.getRepository()). + put("repositoryName", configuration.getRepositoryName()). + put("repositoryType", configuration.getRepositoryType()). + put("branch", configuration.getBranch()). + put("clearLibraryCache", String.valueOf(configuration.isClearLibraryCache())). + put("workingDir", mExecutionContext.getLocalWorkingDirectory()). + put("antArgs", configuration.getAntArgs()). + put("buildTag", buildTag). + put("logDir", logDir.getAbsolutePath()). + put("javaHome", configuration.getJavaHome()). + put("antEnvOpts", configuration.getAntEnvOpts()); + final ImmutableMap templateDefaults = templateDefaultsBuilder.build(); TestParser testParser = new TestParser(configuration.getContext(), new File(mExecutionContext.getLocalWorkingDirectory(), configuration.getRepositoryName() + "-source"), logger); - ImmutableList.Builder hostExecutorsBuilder = ImmutableList.builder(); + HostExecutorBuilder hostExecutorBuilder = new HostExecutorBuilder() { + @Override + public HostExecutor build(Host host) { + return new HostExecutor(host, executionContext.getPrivateKey(), mExecutor, sshCommandExecutor, + rsyncCommandExecutor, templateDefaults, scratchDir, succeededLogDir, failedLogDir, 10, logger); + } + + }; + List hostExecutors = new ArrayList(); for(Host host : mExecutionContext.getHosts()) { - hostExecutorsBuilder.add(new HostExecutor(host, executionContext.getPrivateKey(), mExecutor, sshCommandExecutor, - rsyncCommandExecutor, templateDefaults, scratchDir, succeededLogDir, failedLogDir, 10, logger)); + hostExecutors.add(hostExecutorBuilder.build(host)); } - mHostExecutors = hostExecutorsBuilder.build(); + mHostExecutors = new CopyOnWriteArrayList(hostExecutors); mPhases = Lists.newArrayList(); - mPhases.add(new CleanupPhase(mHostExecutors, localCommandFactory, templateDefaults, logger)); mPhases.add(new PrepPhase(mHostExecutors, localCommandFactory, templateDefaults, scratchDir, patchFile, logger)); - mPhases.add(new ExecutionPhase(mHostExecutors, localCommandFactory, templateDefaults, + mPhases.add(new ExecutionPhase(mHostExecutors, mExecutionContext, hostExecutorBuilder, localCommandFactory, templateDefaults, succeededLogDir, failedLogDir, testParser.parse(), mExecutedTests, mFailedTests, logger)); mPhases.add(new ReportingPhase(mHostExecutors, localCommandFactory, templateDefaults, logger)); } @@ -155,24 +164,25 @@ public class PTest { error = true; } finally { for(HostExecutor hostExecutor : mHostExecutors) { - if(hostExecutor.remainingDrones() == 0) { + if(hostExecutor.isBad()) { mExecutionContext.addBadHost(hostExecutor.getHost()); } } mExecutor.shutdownNow(); - if(mFailedTests.isEmpty()) { - mLogger.info(String.format("%d failed tests", mFailedTests.size())); + SortedSet failedTests = new TreeSet(mFailedTests); + if(failedTests.isEmpty()) { + mLogger.info(String.format("%d failed tests", failedTests.size())); } else { - mLogger.warn(String.format("%d failed tests", mFailedTests.size())); + mLogger.warn(String.format("%d failed tests", failedTests.size())); } - for(String failingTestName : mFailedTests) { + for(String failingTestName : failedTests) { mLogger.warn(failingTestName); } mLogger.info("Executed " + mExecutedTests.size() + " tests"); for(Map.Entry entry : elapsedTimes.entrySet()) { mLogger.info(String.format("PERF: Phase %s took %d minutes", entry.getKey(), entry.getValue())); } - publishJiraComment(error, messages); + publishJiraComment(error, messages, failedTests); if(error || !mFailedTests.isEmpty()) { result = 1; } @@ -180,7 +190,7 @@ public class PTest { return result; } - private void publishJiraComment(boolean error, List messages) { + private void publishJiraComment(boolean error, List messages, SortedSet failedTests) { if(mConfiguration.getJiraName().isEmpty()) { mLogger.info("Skipping JIRA comment as name is empty."); return; @@ -198,7 +208,7 @@ public class PTest { return; } JIRAService jira = new JIRAService(mLogger, mConfiguration, mBuildTag); - jira.postComment(error, mExecutedTests.size(), mFailedTests, messages); + jira.postComment(error, mExecutedTests.size(), failedTests, messages); } public static class Builder { @@ -245,65 +255,65 @@ public class PTest { fromFile(testConfigurationFile); String buildTag = System.getenv("BUILD_TAG") == null ? "undefined-" + System.currentTimeMillis() : System.getenv("BUILD_TAG"); - File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag)); - LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration. - getGlobalLogDirectory()), 5); - cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory()); - cleaner.setDaemon(true); - cleaner.start(); - TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG); - String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim(); - if(!repository.isEmpty()) { - conf.setRepository(repository); - } - String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim(); - if(!repositoryName.isEmpty()) { - conf.setRepositoryName(repositoryName); - } - String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim(); - if(!branch.isEmpty()) { - conf.setBranch(branch); - } - String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim(); - if(!patch.isEmpty()) { - conf.setPatch(patch); - } - String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim(); - if(!javaHome.isEmpty()) { - conf.setJavaHome(javaHome); - } - String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim(); - if(!antEnvOpts.isEmpty()) { - conf.setAntEnvOpts(antEnvOpts); - } - String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG); - if(supplementalAntArgs != null && supplementalAntArgs.length > 0) { - String antArgs = Strings.nullToEmpty(conf.getAntArgs()); - if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) { - antArgs += " "; - } - antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs); - conf.setAntArgs(antArgs); - } - ExecutionContextProvider executionContextProvider = null; - ExecutionContext executionContext = null; - int exitCode = 0; - try { - executionContextProvider = executionContextConfiguration - .getExecutionContextProvider(); - executionContext = executionContextProvider.createExecutionContext(); - PTest ptest = new PTest(conf, executionContext, buildTag, logDir, - new LocalCommandFactory(LOG), new SSHCommandExecutor(LOG), - new RSyncCommandExecutor(LOG), LOG); - exitCode = ptest.run(); - } finally { - if(executionContext != null) { - executionContext.terminate(); - } - if(executionContextProvider != null) { - executionContextProvider.close(); - } - } - System.exit(exitCode); + File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag)); + LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration. + getGlobalLogDirectory()), 5); + cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory()); + cleaner.setDaemon(true); + cleaner.start(); + TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG); + String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim(); + if(!repository.isEmpty()) { + conf.setRepository(repository); + } + String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim(); + if(!repositoryName.isEmpty()) { + conf.setRepositoryName(repositoryName); + } + String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim(); + if(!branch.isEmpty()) { + conf.setBranch(branch); + } + String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim(); + if(!patch.isEmpty()) { + conf.setPatch(patch); + } + String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim(); + if(!javaHome.isEmpty()) { + conf.setJavaHome(javaHome); + } + String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim(); + if(!antEnvOpts.isEmpty()) { + conf.setAntEnvOpts(antEnvOpts); + } + String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG); + if(supplementalAntArgs != null && supplementalAntArgs.length > 0) { + String antArgs = Strings.nullToEmpty(conf.getAntArgs()); + if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) { + antArgs += " "; + } + antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs); + conf.setAntArgs(antArgs); + } + ExecutionContextProvider executionContextProvider = null; + ExecutionContext executionContext = null; + int exitCode = 0; + try { + executionContextProvider = executionContextConfiguration + .getExecutionContextProvider(); + executionContext = executionContextProvider.createExecutionContext(); + PTest ptest = new PTest(conf, executionContext, buildTag, logDir, + new LocalCommandFactory(LOG), new SSHCommandExecutor(LOG), + new RSyncCommandExecutor(LOG), LOG); + exitCode = ptest.run(); + } finally { + if(executionContext != null) { + executionContext.terminate(); + } + if(executionContextProvider != null) { + executionContextProvider.close(); + } + } + System.exit(exitCode); } } Modified: hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java?rev=1514554&r1=1514553&r2=1514554&view=diff ============================================================================== --- hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java (original) +++ hive/branches/vectorization/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/Phase.java Fri Aug 16 01:21:54 2013 @@ -20,29 +20,32 @@ package org.apache.hive.ptest.execution; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hive.ptest.execution.LocalCommand.CollectLogPolicy; import org.apache.hive.ptest.execution.ssh.NonZeroExitCodeException; -import org.apache.hive.ptest.execution.ssh.RSyncResult; import org.apache.hive.ptest.execution.ssh.RemoteCommandResult; import org.apache.hive.ptest.execution.ssh.SSHExecutionException; import org.apache.hive.ptest.execution.ssh.SSHResult; import org.slf4j.Logger; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; public abstract class Phase { - protected final ImmutableList hostExecutors; + protected final List hostExecutors; private final LocalCommandFactory localCommandFactory; private final ImmutableMap templateDefaults; protected final Logger logger; - public Phase(ImmutableList hostExecutors, + public Phase(List hostExecutors, LocalCommandFactory localCommandFactory, ImmutableMap templateDefaults, Logger logger) { super(); @@ -67,13 +70,13 @@ public abstract class Phase { } } // prep - protected List rsyncFromLocalToRemoteInstances(String localFile, String remoteFile) + protected List rsyncFromLocalToRemoteInstances(String localFile, String remoteFile) throws Exception { - List> futures = Lists.newArrayList(); + List>>> futures = Lists.newArrayList(); for(HostExecutor hostExecutor : hostExecutors) { - futures.addAll(hostExecutor.rsyncFromLocalToRemoteInstances(localFile, remoteFile)); + futures.add(hostExecutor.rsyncFromLocalToRemoteInstances(localFile, remoteFile)); } - return toListOfResults(futures); + return flatten(futures); } // clean @@ -86,16 +89,76 @@ public abstract class Phase { return toListOfResults(futures); } // clean prep - protected List execInstances(String command) + protected List execInstances(String command) throws Exception { - List> futures = Lists.newArrayList(); + List> futures = Lists.newArrayList(); for(HostExecutor hostExecutor : hostExecutors) { futures.addAll(hostExecutor.execInstances(command)); } return toListOfResults(futures); } + protected List initalizeHosts() + throws Exception { + List>> futures = Lists.newArrayList(); + ListeningExecutorService executor = MoreExecutors. + listeningDecorator(Executors.newFixedThreadPool(hostExecutors.size())); + try { + for(final HostExecutor hostExecutor : hostExecutors) { + futures.add(executor.submit(new Callable>() { + @Override + public List call() throws Exception { + return initalizeHost(hostExecutor); + } + })); + } + List results = Lists.newArrayList(); + for(ListenableFuture> future : futures) { + List result = future.get(); + if(result != null) { + results.addAll(result); + } + } + executor.shutdown(); + return results; + } finally { + if(executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + protected List initalizeHost(HostExecutor hostExecutor) + throws Exception { + List results = Lists.newArrayList(); + results.add(hostExecutor.exec("killall -q -9 -f java || true").get()); + TimeUnit.SECONDS.sleep(1); + // order matters in all of these so block + results.addAll(toListOfResults(hostExecutor.execInstances("rm -rf $localDir/$instanceName/scratch $localDir/$instanceName/logs"))); + results.addAll(toListOfResults(hostExecutor.execInstances("mkdir -p $localDir/$instanceName/logs " + + "$localDir/$instanceName/maven " + + "$localDir/$instanceName/scratch " + + "$localDir/$instanceName/ivy " + + "$localDir/$instanceName/${repositoryName}-source"))); + // order does not matter below, so go wide + List>>> futures = Lists.newArrayList(); + futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/${repositoryName}-source", "$localDir/$instanceName/")); + futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/maven", "$localDir/$instanceName/")); + futures.add(hostExecutor.rsyncFromLocalToRemoteInstances("$workingDir/ivy", "$localDir/$instanceName/")); + results.addAll(flatten(futures)); + return results; + } + private List flatten(List>>> futures) + throws Exception { + List results = Lists.newArrayList(); + for(ListenableFuture>> future : futures) { + List> result = future.get(); + if(result != null) { + results.addAll(toListOfResults(result)); + } + } + return results; + } private List toListOfResults(List> futures) - throws Exception { + throws Exception { List results = Lists.newArrayList(); for(T result : Futures.allAsList(futures).get()) { if(result != null) { @@ -110,7 +173,4 @@ public abstract class Phase { protected ImmutableMap getTemplateDefaults() { return templateDefaults; } - protected ImmutableList getHostExecutors() { - return hostExecutors; - } }