Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/tools/MRAdmin.java Tue Jan 26 14:02:53 2010
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.AdminOperationsProtocol;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.util.Tool;
@@ -53,16 +54,20 @@
private static void printHelp(String cmd) {
String summary = "hadoop mradmin is the command to execute Map-Reduce administrative commands.\n" +
"The full syntax is: \n\n" +
- "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] [-help [cmd]] "
- + "[-refreshNodes]\n";
+ "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
+ "[-refreshNodes] [-refreshUserToGroupsMappings] [-help [cmd]]\n";
String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tJobtracker will reload the authorization policy file.\n";
String refreshQueues =
- "-refreshQueues: Reload the queue acls and state.\n"
- + "\t\tJobTracker will reload the mapred-queues.xml file.\n";
+ "-refreshQueues: Reload the queues' acls, states and "
+ + "scheduler specific properties.\n"
+ + "\t\tJobTracker will reload the mapred-queues configuration file.\n";
+ String refreshUserToGroupsMappings =
+ "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+
String refreshNodes =
"-refreshNodes: Refresh the hosts information at the jobtracker.\n";
@@ -73,6 +78,8 @@
System.out.println(refreshServiceAcl);
} else if ("refreshQueues".equals(cmd)) {
System.out.println(refreshQueues);
+ } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+ System.out.println(refreshUserToGroupsMappings);
} else if ("refreshNodes".equals(cmd)) {
System.out.println(refreshNodes);
} else if ("help".equals(cmd)) {
@@ -98,12 +105,15 @@
System.err.println("Usage: java MRAdmin" + " [-refreshServiceAcl]");
} else if ("-refreshQueues".equals(cmd)) {
System.err.println("Usage: java MRAdmin" + " [-refreshQueues]");
+ } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+ System.err.println("Usage: java MRAdmin" + " [-refreshUserToGroupsMappings]");
} else if ("-refreshNodes".equals(cmd)) {
System.err.println("Usage: java MRAdmin" + " [-refreshNodes]");
} else {
System.err.println("Usage: java MRAdmin");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-refreshQueues]");
+ System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshNodes]");
System.err.println(" [-help [cmd]]");
System.err.println();
@@ -142,6 +152,29 @@
return 0;
}
+ /**
+ * Refresh the user-to-groups mappings on the {@link JobTracker}.
+ * @return exitcode 0 on success, non-zero on failure
+ * @throws IOException
+ */
+ private int refreshUserToGroupsMappings() throws IOException {
+ // Get the current configuration
+ Configuration conf = getConf();
+ // Create the client
+ RefreshUserToGroupMappingsProtocol refreshProtocol =
+ (RefreshUserToGroupMappingsProtocol)
+ RPC.getProxy(RefreshUserToGroupMappingsProtocol.class,
+ RefreshUserToGroupMappingsProtocol.versionID,
+ JobTracker.getAddress(conf), getUGI(conf), conf,
+ NetUtils.getSocketFactory(conf,
+ RefreshUserToGroupMappingsProtocol.class));
+
+ // Refresh the user-to-groups mappings
+ refreshProtocol.refreshUserToGroupsMappings(conf);
+
+ return 0;
+ }
+
private int refreshQueues() throws IOException {
// Get the current configuration
Configuration conf = getConf();
@@ -196,12 +229,11 @@
int exitCode = -1;
int i = 0;
String cmd = args[i++];
-
//
// verify that we have enough command line parameters
//
- if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd)
- || "-refreshNodes".equals(cmd)) {
+ if ("-refreshServiceAcl".equals(cmd) || "-refreshQueues".equals(cmd) ||
+ "-refreshNodes".equals(cmd) || "-refreshUserToGroupsMappings".equals(cmd)) {
if (args.length != 1) {
printUsage(cmd);
return exitCode;
@@ -214,6 +246,8 @@
exitCode = refreshAuthorizationPolicy();
} else if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues();
+ } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+ exitCode = refreshUserToGroupsMappings();
} else if ("-refreshNodes".equals(cmd)) {
exitCode = refreshNodes();
} else if ("-help".equals(cmd)) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Jan 26 14:02:53 2010
@@ -46,6 +46,7 @@
private Configuration conf;
private FileSystem fs = null;
private Path sysDir = null;
+ private Path stagingAreaDir = null;
private Path jobHistoryDir = null;
static {
@@ -76,6 +77,7 @@
ClientProtocol client;
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
+ conf.setInt("mapreduce.job.maps", 1);
client = new LocalJobRunner(conf);
} else {
client = createRPCProxy(JobTracker.getAddress(conf), conf);
@@ -222,6 +224,19 @@
}
return sysDir;
}
+
+ /**
+ * Grab the jobtracker's view of the staging directory path where
+ * job-specific files will be placed.
+ *
+ * @return the staging directory where job-specific files are to be placed.
+ */
+ public Path getStagingAreaDir() throws IOException, InterruptedException {
+ if (stagingAreaDir == null) {
+ stagingAreaDir = new Path(client.getStagingAreaDir());
+ }
+ return stagingAreaDir;
+ }
/**
* Get the job history file path for a given job id. The job history file at
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java Tue Jan 26 14:02:53 2010
@@ -20,8 +20,6 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -30,7 +28,6 @@
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLConnection;
-import java.util.Arrays;
import java.net.URI;
import javax.security.auth.login.LoginException;
@@ -40,12 +37,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
@@ -956,7 +950,7 @@
ensureState(JobState.DEFINE);
setUseNewAPI();
status = new JobSubmitter(cluster.getFileSystem(),
- cluster.getClient()).submitJobInternal(this);
+ cluster.getClient()).submitJobInternal(this, cluster);
state = JobState.RUNNING;
}
@@ -1033,12 +1027,52 @@
return isSuccessful();
}
+ /**
+ * @return true if the profile parameters indicate that this is using
+ * hprof, which generates profile files in a particular location
+ * that we can retrieve to the client.
+ */
+ private boolean shouldDownloadProfile() {
+ // Check the argument string that was used to initialize profiling.
+ // If this indicates hprof and file-based output, then we're ok to
+ // download.
+ String profileParams = getProfileParams();
+
+ if (null == profileParams) {
+ return false;
+ }
+
+ // Split this on whitespace.
+ String [] parts = profileParams.split("[ \\t]+");
+
+ // If any of these indicate hprof, and the use of output files, return true.
+ boolean hprofFound = false;
+ boolean fileFound = false;
+ for (String p : parts) {
+ if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
+ hprofFound = true;
+
+ // This contains a number of comma-delimited components, one of which
+ // may specify the file to write to. Make sure this is present and
+ // not empty.
+ String [] subparts = p.split(",");
+ for (String sub : subparts) {
+ if (sub.startsWith("file=") && sub.length() != "file=".length()) {
+ fileFound = true;
+ }
+ }
+ }
+ }
+
+ return hprofFound && fileFound;
+ }
+
private void printTaskEvents(TaskCompletionEvent[] events,
Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
IntegerRanges reduceRanges) throws IOException, InterruptedException {
for (TaskCompletionEvent event : events) {
TaskCompletionEvent.Status status = event.getStatus();
- if (profiling &&
+ if (profiling && shouldDownloadProfile() &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMapTask() ? mapRanges : reduceRanges).
@@ -1211,106 +1245,4 @@
}
return ugi;
}
-
- /**
- * Read a splits file into a list of raw splits.
- *
- * @param in the stream to read from
- * @return the complete list of splits
- * @throws IOException
- */
- public static RawSplit[] readSplitFile(DataInput in) throws IOException {
- byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length];
- in.readFully(header);
- if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) {
- throw new IOException("Invalid header on split file");
- }
- int vers = WritableUtils.readVInt(in);
- if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) {
- throw new IOException("Unsupported split version " + vers);
- }
- int len = WritableUtils.readVInt(in);
- RawSplit[] result = new RawSplit[len];
- for (int i=0; i < len; ++i) {
- result[i] = new RawSplit();
- result[i].readFields(in);
- }
- return result;
- }
-
- public static class RawSplit implements Writable {
- private String splitClass;
- private BytesWritable bytes = new BytesWritable();
- private String[] locations;
- long dataLength;
-
- public RawSplit() {
- }
-
- protected RawSplit(String splitClass, BytesWritable bytes,
- String[] locations, long dataLength) {
- this.splitClass = splitClass;
- this.bytes = bytes;
- this.locations = locations;
- this.dataLength = dataLength;
- }
-
- public void setBytes(byte[] data, int offset, int length) {
- bytes.set(data, offset, length);
- }
-
- public void setClassName(String className) {
- splitClass = className;
- }
-
- public String getClassName() {
- return splitClass;
- }
-
- public BytesWritable getBytes() {
- return bytes;
- }
-
- public void clearBytes() {
- bytes = null;
- }
-
- public void setLocations(String[] locations) {
- this.locations = locations;
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- public long getDataLength() {
- return dataLength;
- }
-
- public void setDataLength(long l) {
- dataLength = l;
- }
-
- public void readFields(DataInput in) throws IOException {
- splitClass = Text.readString(in);
- dataLength = in.readLong();
- bytes.readFields(in);
- int len = WritableUtils.readVInt(in);
- locations = new String[len];
- for (int i=0; i < len; ++i) {
- locations[i] = Text.readString(in);
- }
- }
-
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, splitClass);
- out.writeLong(dataLength);
- bytes.write(out);
- WritableUtils.writeVInt(out, locations.length);
- for (int i = 0; i < locations.length; i++) {
- Text.writeString(out, locations[i]);
- }
- }
- }
-
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jan 26 14:02:53 2010
@@ -55,6 +55,7 @@
public static final String JAR = "mapreduce.job.jar";
public static final String ID = "mapreduce.job.id";
public static final String JOB_NAME = "mapreduce.job.name";
+ public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
public static final String USER_NAME = "mapreduce.job.user.name";
public static final String PRIORITY = "mapreduce.job.priority";
public static final String QUEUE_NAME = "mapreduce.job.queuename";
@@ -106,6 +107,10 @@
"mapreduce.job.cache.files.timestamps";
public static final String CACHE_ARCHIVES_TIMESTAMPS =
"mapreduce.job.cache.archives.timestamps";
+ public static final String CACHE_FILE_VISIBILITIES =
+ "mapreduce.job.cache.files.visibilities";
+ public static final String CACHE_ARCHIVES_VISIBILITIES =
+ "mapreduce.job.cache.archives.visibilities";
public static final String CACHE_SYMLINK =
"mapreduce.job.cache.symlink.create";
@@ -162,7 +167,7 @@
"mapreduce.map.skip.proc-count.auto-incr";
public static final String MAP_SKIP_MAX_RECORDS =
"mapreduce.map.skip.maxrecords";
- public static final String MAP_COMBINE_MIN_SPISS =
+ public static final String MAP_COMBINE_MIN_SPILLS =
"mapreduce.map.combine.minspills";
public static final String MAP_OUTPUT_COMPRESS =
"mapreduce.map.output.compress";
@@ -212,6 +217,10 @@
"mapreduce.reduce.shuffle.connect.timeout";
public static final String SHUFFLE_READ_TIMEOUT =
"mapreduce.reduce.shuffle.read.timeout";
+ public static final String SHUFFLE_FETCH_FAILURES =
+ "mapreduce.reduce.shuffle.maxfetchfailures";
+ public static final String SHUFFLE_NOTIFY_READERROR =
+ "mapreduce.reduce.shuffle.notify.readerror";
public static final String REDUCE_SKIP_INCR_PROC_COUNT =
"mapreduce.reduce.skip.proc-count.auto-incr";
public static final String REDUCE_SKIP_MAXGROUPS =
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# ResourceBundle properties file for job-level counters
CounterGroupName= Job Counters
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Tue Jan 26 14:02:53 2010
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mapreduce;
-import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
@@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,15 +36,15 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.util.ReflectionUtils;
+import org.codehaus.jackson.map.ObjectMapper;
class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
@@ -128,12 +129,7 @@
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
-
- /*
- * set this user's id in job configuration, so later job files can be
- * accessed using this user's id
- */
- job.setUGIAndUserGroupNames();
+ String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
@@ -145,14 +141,18 @@
// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
- jtFs.delete(submitJobDir, true);
+ if (jtFs.exists(submitJobDir)) {
+ throw new IOException("Not submitting job. Job directory " + submitJobDir
+ +" already exists!! This is unexpected.Please check what's there in" +
+ " that directory");
+ }
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
- FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+ FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
- Path filesDir = new Path(submitJobDir, "files");
- Path archivesDir = new Path(submitJobDir, "archives");
- Path libjarsDir = new Path(submitJobDir, "libjars");
+ Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+ Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+ Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
@@ -185,7 +185,8 @@
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
- DistributedCache.addFileToClassPath(newPath, conf);
+ DistributedCache.addFileToClassPath(
+ new Path(newPath.toUri().getPath()), conf);
}
}
@@ -212,11 +213,26 @@
DistributedCache.createSymlink(conf);
}
}
-
+
+ if (jobJar != null) { // copy jar to JobTracker's fs
+ // use jar name if job is not named.
+ if ("".equals(job.getJobName())){
+ job.setJobName(new Path(jobJar).getName());
+ }
+ copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir),
+ replication);
+ job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+ } else {
+ LOG.warn("No job jar file set. User classes may not be found. "+
+ "See Job or Job#setJar(String).");
+ }
+
// set the timestamps of the archives and files
TrackerDistributedCacheManager.determineTimestamps(conf);
+ // set the public/private visibility of the archives and files
+ TrackerDistributedCacheManager.determineCacheVisibilities(conf);
}
-
+
private URI getPathURI(Path destPath, String fragment)
throws URISyntaxException {
URI pathURI = destPath.toUri();
@@ -234,36 +250,20 @@
short replication) throws IOException {
jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
jtFs.setReplication(submitJarFile, replication);
- jtFs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+ jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
}
+
/**
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives.
* @param conf
* @throws IOException
*/
- private void configureCommandLineOptions(Job job, Path submitJobDir,
- Path submitJarFile) throws IOException {
+ private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
+ throws IOException {
Configuration conf = job.getConfiguration();
short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
- copyAndConfigureFiles(job, submitJobDir, replication);
-
- /* set this user's id in job configuration, so later job files can be
- * accessed using this user's id
- */
- String originalJarPath = job.getJar();
-
- if (originalJarPath != null) { // copy jar to JobTracker's fs
- // use jar name if job is not named.
- if ("".equals(job.getJobName())){
- job.setJobName(new Path(originalJarPath).getName());
- }
- job.setJar(submitJarFile.toString());
- copyJar(new Path(originalJarPath), submitJarFile, replication);
- } else {
- LOG.warn("No job jar file set. User classes may not be found. "+
- "See Job or Job#setJar(String).");
- }
+ copyAndConfigureFiles(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
@@ -271,15 +271,6 @@
}
}
-
- // job files are world-wide readable and owner writable
- final private static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- // job submission directory is world readable/writable/executable
- final static FsPermission JOB_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-
/**
* Internal method for submitting jobs to the system.
*
@@ -305,45 +296,79 @@
* </li>
* </ol></p>
* @param job the configuration to submit
+ * @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
- JobStatus submitJobInternal(Job job) throws ClassNotFoundException,
- InterruptedException, IOException {
-
+ @SuppressWarnings("unchecked")
+ JobStatus submitJobInternal(Job job, Cluster cluster)
+ throws ClassNotFoundException, InterruptedException, IOException {
+ /*
+ * set this user's id in job configuration, so later job files can be
+ * accessed using this user's id
+ */
+ job.setUGIAndUserGroupNames();
+
+ Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
+ job.getConfiguration());
//configure the command line options correctly on the submitting dfs
Configuration conf = job.getConfiguration();
JobID jobId = submitClient.getNewJobID();
- Path submitJobDir = new Path(submitClient.getSystemDir(), jobId.toString());
- Path submitJarFile = new Path(submitJobDir, "job.jar");
- Path submitSplitFile = new Path(submitJobDir, "job.split");
- configureCommandLineOptions(job, submitJobDir, submitJarFile);
- Path submitJobFile = new Path(submitJobDir, "job.xml");
-
- checkSpecs(job);
+ Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+ JobStatus status = null;
+ try {
+ conf.set("mapreduce.job.dir", submitJobDir.toString());
+ LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ + " as the submit dir");
+ copyAndConfigureFiles(job, submitJobDir);
+ Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
- // Create the splits for the job
- LOG.info("Creating splits at " + jtFs.makeQualified(submitSplitFile));
- int maps = writeSplits(job, submitSplitFile);
- conf.set("mapred.job.split.file", submitSplitFile.toString());
- conf.setInt("mapred.map.tasks", maps);
- LOG.info("number of splits:" + maps);
-
- // Write job file to JobTracker's fs
- writeConf(conf, submitJobFile);
-
- //
- // Now, actually submit the job (using the submit name)
- //
- JobStatus status = submitClient.submitJob(jobId);
- if (status != null) {
- return status;
- } else {
- throw new IOException("Could not launch job");
+ checkSpecs(job);
+
+ // create TokenStorage object with user secretKeys
+ String tokensFileName = conf.get("tokenCacheFile");
+ TokenStorage tokenStorage = null;
+ if(tokensFileName != null) {
+ LOG.info("loading secret keys from " + tokensFileName);
+ String localFileName = new Path(tokensFileName).toUri().getPath();
+ tokenStorage = new TokenStorage();
+ // read JSON
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> nm =
+ mapper.readValue(new File(localFileName), Map.class);
+
+ for(Map.Entry<String, String> ent: nm.entrySet()) {
+ LOG.debug("adding secret key alias="+ent.getKey());
+ tokenStorage.addSecretKey(new Text(ent.getKey()), ent.getValue().getBytes());
+ }
+ }
+
+ // Create the splits for the job
+ LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
+ int maps = writeSplits(job, submitJobDir);
+ conf.setInt("mapred.map.tasks", maps);
+ LOG.info("number of splits:" + maps);
+
+ // Write job file to submit dir
+ writeConf(conf, submitJobFile);
+ //
+ // Now, actually submit the job (using the submit name)
+ //
+ status = submitClient.submitJob(jobId, submitJobDir.toString(), tokenStorage);
+ if (status != null) {
+ return status;
+ } else {
+ throw new IOException("Could not launch job");
+ }
+ } finally {
+ if (status == null) {
+ LOG.info("Cleaning up the staging area " + submitJobDir);
+ jtFs.delete(submitJobDir, true);
+ }
}
}
-
+
private void checkSpecs(Job job) throws ClassNotFoundException,
InterruptedException, IOException {
JobConf jConf = (JobConf)job.getConfiguration();
@@ -364,7 +389,7 @@
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(jtFs, jobFile,
- new FsPermission(JOB_FILE_PERMISSION));
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
conf.writeXml(out);
} finally {
@@ -372,81 +397,42 @@
}
}
+
@SuppressWarnings("unchecked")
- private <T extends InputSplit>
- int writeNewSplits(JobContext job, Path submitSplitFile) throws IOException,
+ private <T extends InputSplit>
+ int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
-
+
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
- DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
- array.length);
- try {
- if (array.length != 0) {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Job.RawSplit rawSplit = new Job.RawSplit();
- SerializationFactory factory = new SerializationFactory(conf);
- Serializer<T> serializer =
- factory.getSerializer((Class<T>) array[0].getClass());
- serializer.open(buffer);
- for (T split: array) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- serializer.serialize(split);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- serializer.close();
- }
- } finally {
- out.close();
- }
+ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
return array.length;
}
-
- static final int CURRENT_SPLIT_FILE_VERSION = 0;
- static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
- private DataOutputStream writeSplitsFileHeader(Configuration conf,
- Path filename, int length) throws IOException {
- // write the splits to a file for the job tracker
- FileSystem fs = filename.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
- out.write(SPLIT_FILE_HEADER);
- WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
- WritableUtils.writeVInt(out, length);
- return out;
- }
-
+
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
- Path submitSplitFile) throws IOException,
+ Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
- // Create the splits for the job
- LOG.debug("Creating splits at " + jtFs.makeQualified(submitSplitFile));
int maps;
if (jConf.getUseNewMapper()) {
- maps = writeNewSplits(job, submitSplitFile);
+ maps = writeNewSplits(job, jobSubmitDir);
} else {
- maps = writeOldSplits(jConf, submitSplitFile);
+ maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
-
- // method to write splits for old api mapper.
- private int writeOldSplits(JobConf job,
- Path submitSplitFile) throws IOException {
- org.apache.hadoop.mapred.InputSplit[] splits =
+
+ //method to write splits for old api mapper.
+ private int writeOldSplits(JobConf job, Path jobSubmitDir)
+ throws IOException {
+ org.apache.hadoop.mapred.InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
@@ -468,24 +454,7 @@
}
}
});
- DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile,
- splits.length);
-
- try {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Job.RawSplit rawSplit = new Job.RawSplit();
- for (org.apache.hadoop.mapred.InputSplit split: splits) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- split.write(buffer);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- } finally {
- out.close();
- }
+ JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
return splits.length;
}
@@ -505,7 +474,7 @@
} catch (IOException ie) {
throw new RuntimeException("exception in compare", ie);
} catch (InterruptedException ie) {
- throw new RuntimeException("exception in compare", ie);
+ throw new RuntimeException("exception in compare", ie);
}
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_OUTPUT_BYTES,
+ SPLIT_RAW_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
@@ -32,5 +33,8 @@
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
REDUCE_SKIPPED_RECORDS,
- SPILLED_RECORDS
+ SPILLED_RECORDS,
+ SHUFFLED_MAPS,
+ FAILED_SHUFFLE,
+ MERGED_MAP_OUTPUTS
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# ResourceBundle properties file for Map-Reduce counters
CounterGroupName= Map-Reduce Framework
@@ -15,4 +27,6 @@
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups
SPILLED_RECORDS.name= Spilled Records
-
+SHUFFLED_MAPS.name= Shuffled Maps
+FAILED_SHUFFLE.name= Failed Shuffles
+MERGED_MAP_OUTPUTS.name= Merged Map outputs
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.DefaultTaskController;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -198,9 +199,9 @@
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
- return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
- baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
- honorSymLinkConf);
+ return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
+ confFileStamp, currentWorkDir, honorSymLinkConf, false);
}
/**
@@ -277,8 +278,8 @@
if (timestamp == null) {
throw new IOException("TimeStamp of the uri couldnot be found");
}
- new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
- Long.parseLong(timestamp));
+ new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .releaseCache(cache, conf, Long.parseLong(timestamp));
}
/**
@@ -294,7 +295,8 @@
@Deprecated
public static String makeRelative(URI cache, Configuration conf)
throws IOException {
- return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
+ return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .makeRelative(cache, conf);
}
/**
@@ -657,6 +659,7 @@
*/
@Deprecated
public static void purgeCache(Configuration conf) throws IOException {
- new TrackerDistributedCacheManager(conf).purgeCache();
+ new TrackerDistributedCacheManager(conf, new DefaultTaskController())
+ .purgeCache();
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Jan 26 14:02:53 2010
@@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* Helper class of {@link TrackerDistributedCacheManager} that represents
@@ -43,9 +44,8 @@
* by TaskRunner/LocalJobRunner to parse out the job configuration
* and setup the local caches.
*
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
*/
+@InterfaceAudience.Private
public class TaskDistributedCacheManager {
private final TrackerDistributedCacheManager distributedCacheManager;
private final Configuration taskConf;
@@ -66,6 +66,7 @@
REGULAR,
ARCHIVE
}
+ boolean isPublic = true;
/** Whether to decompress */
final FileType type;
final long timestamp;
@@ -73,10 +74,11 @@
final boolean shouldBeAddedToClassPath;
boolean localized = false;
- private CacheFile(URI uri, FileType type, long timestamp,
+ private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
boolean classPath) {
this.uri = uri;
this.type = type;
+ this.isPublic = isPublic;
this.timestamp = timestamp;
this.shouldBeAddedToClassPath = classPath;
}
@@ -87,7 +89,7 @@
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, Path[] paths, FileType type) {
+ String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
if (uris.length != timestamps.length) {
@@ -103,7 +105,8 @@
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
long t = Long.parseLong(timestamps[i]);
- ret.add(new CacheFile(u, type, t, isClassPath));
+ ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
+ t, isClassPath));
}
}
return ret;
@@ -127,11 +130,13 @@
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
DistributedCache.getFileTimestamps(taskConf),
+ TrackerDistributedCacheManager.getFileVisibilities(taskConf),
DistributedCache.getFileClassPaths(taskConf),
CacheFile.FileType.REGULAR));
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
DistributedCache.getArchiveTimestamps(taskConf),
+ TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
DistributedCache.getArchiveClassPaths(taskConf),
CacheFile.FileType.ARCHIVE));
}
@@ -144,9 +149,9 @@
* file, if necessary.
*/
public void setup(LocalDirAllocator lDirAlloc, File workDir,
- String cacheSubdir) throws IOException {
+ String privateCacheSubdir, String publicCacheSubDir) throws IOException {
setupCalled = true;
-
+
if (cacheFiles.isEmpty()) {
return;
}
@@ -159,11 +164,14 @@
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-
+ String cacheSubdir = publicCacheSubDir;
+ if (!cacheFile.isPublic) {
+ cacheSubdir = privateCacheSubdir;
+ }
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
- cacheFile.timestamp, workdirPath, false);
+ cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Jan 26 14:02:53 2010
@@ -30,15 +30,22 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* Manages a single machine's instance of a cross-job
@@ -46,9 +53,8 @@
* by a TaskTracker (or something that emulates it,
* like LocalJobRunner).
*
- * <b>This class is internal to Hadoop, and should not be treated as a public
- * interface.</b>
*/
+@InterfaceAudience.Private
public class TrackerDistributedCacheManager {
// cacheID to cacheStatus mapping
private TreeMap<String, CacheStatus> cachedArchives =
@@ -66,14 +72,32 @@
private LocalDirAllocator lDirAllocator;
+ private TaskController taskController;
+
private Configuration trackerConf;
private Random random = new Random();
- public TrackerDistributedCacheManager(Configuration conf) throws IOException {
+ private MRAsyncDiskService asyncDiskService;
+
+ public TrackerDistributedCacheManager(Configuration conf,
+ TaskController taskController) throws IOException {
this.localFs = FileSystem.getLocal(conf);
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
+ this.taskController = taskController;
+ }
+
+ /**
+ * Creates a TrackerDistributedCacheManager with a MRAsyncDiskService.
+ * @param asyncDiskService Provides a set of ThreadPools for async disk
+ * operations.
+ */
+ public TrackerDistributedCacheManager(Configuration conf,
+ TaskController taskController, MRAsyncDiskService asyncDiskService)
+ throws IOException {
+ this(conf, taskController);
+ this.asyncDiskService = asyncDiskService;
}
/**
@@ -101,6 +125,7 @@
* launches
* NOTE: This is effectively always on since r696957, since there is no code
* path that does not use this.
+ * @param isPublic to know the cache file is accessible to public or private
* @return the path to directory where the archives are unjarred in case of
* archives, the path to the file where the file is copied locally
* @throws IOException
@@ -108,7 +133,7 @@
Path getLocalCache(URI cache, Configuration conf,
String subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf)
+ Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
throws IOException {
String key = getKey(cache, conf, confFileStamp);
CacheStatus lcacheStatus;
@@ -117,13 +142,13 @@
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
+ String uniqueString = String.valueOf(random.nextLong());
String cachePath = new Path (subDir,
- new Path(String.valueOf(random.nextLong()),
- makeRelative(cache, conf))).toString();
+ new Path(uniqueString, makeRelative(cache, conf))).toString();
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
fileStatus.getLen(), trackerConf);
- lcacheStatus = new CacheStatus(
- new Path(localPath.toString().replace(cachePath, "")), localPath);
+ lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
+ cachePath, "")), localPath, new Path(subDir), uniqueString);
cachedArchives.put(key, lcacheStatus);
}
@@ -137,7 +162,7 @@
synchronized (lcacheStatus) {
if (!lcacheStatus.isInited()) {
localizedPath = localizeCache(conf, cache, confFileStamp,
- lcacheStatus, fileStatus, isArchive);
+ lcacheStatus, fileStatus, isArchive, isPublic);
lcacheStatus.initComplete();
} else {
localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
@@ -242,23 +267,47 @@
// do the deletion, after releasing the global lock
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ deleteLocalPath(asyncDiskService,
+ FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
// decrement the size of the cache from baseDirSize
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
if ( dirSize != null ) {
dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ baseDirSize.put(lcacheStatus.localizedBaseDir, dirSize);
} else {
LOG.warn("Cannot find record of the baseDir: " +
- lcacheStatus.baseDir + " during delete!");
+ lcacheStatus.localizedBaseDir + " during delete!");
}
}
}
}
}
+ /**
+ * Delete a local path with asyncDiskService if available,
+ * or otherwise synchronously with local file system.
+ */
+ private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
+ LocalFileSystem fs, Path path) throws IOException {
+ boolean deleted = false;
+ if (asyncDiskService != null) {
+ // Try to delete using asyncDiskService
+ String localPathToDelete =
+ path.toUri().getPath();
+ deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete);
+ if (!deleted) {
+ LOG.warn("Cannot find DistributedCache path " + localPathToDelete
+ + " on any of the asyncDiskService volumes!");
+ }
+ }
+ if (!deleted) {
+ // If no asyncDiskService, we will delete the files synchronously
+ fs.delete(path, true);
+ }
+ LOG.info("Deleted path " + path);
+ }
+
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
@@ -305,6 +354,51 @@
return fileSystem.getFileStatus(filePath).getModificationTime();
}
+
+ /**
+ * Returns a boolean to denote whether a cache file is visible to all(public)
+ * or not
+ * @param conf
+ * @param uri
+ * @return true if the path in the uri is visible to all, false otherwise
+ * @throws IOException
+ */
+ static boolean isPublic(Configuration conf, URI uri) throws IOException {
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path current = new Path(uri.getPath());
+ //the leaf level file should be readable by others
+ if (!checkPermissionOfOther(fs, current, FsAction.READ)) {
+ return false;
+ }
+ current = current.getParent();
+ while (current != null) {
+ //the subdirs in the path should have execute permissions for others
+ if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+ return false;
+ }
+ current = current.getParent();
+ }
+ return true;
+ }
+ /**
+ * Checks for a given path whether the Other permissions on it
+ * imply the permission in the passed FsAction
+ * @param fs
+ * @param path
+ * @param action
+ * @return true if the path in the uri is visible to all, false otherwise
+ * @throws IOException
+ */
+ private static boolean checkPermissionOfOther(FileSystem fs, Path path,
+ FsAction action) throws IOException {
+ FileStatus status = fs.getFileStatus(path);
+ FsPermission perms = status.getPermission();
+ FsAction otherAction = perms.getOtherAction();
+ if (otherAction.implies(action)) {
+ return true;
+ }
+ return false;
+ }
private Path checkCacheStatusValidity(Configuration conf,
URI cache, long confFileStamp,
@@ -316,13 +410,13 @@
// Has to be
if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
cacheStatus, fileStatus)) {
- throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
+ throw new IOException("Stale cache file: " + cacheStatus.localizedLoadPath +
" for cache-file: " + cache);
}
LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
+ cache.toString(), cacheStatus.localizedLoadPath));
+ return cacheStatus.localizedLoadPath;
}
private void createSymlink(Configuration conf, URI cache,
@@ -337,7 +431,7 @@
File flink = new File(link);
if (doSymlink){
if (!flink.exists()) {
- FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
+ FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
}
}
}
@@ -348,21 +442,21 @@
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
- boolean isArchive)
+ boolean isArchive, boolean isPublic)
throws IOException {
FileSystem fs = FileSystem.get(cache, conf);
FileSystem localFs = FileSystem.getLocal(conf);
Path parchive = null;
if (isArchive) {
- parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
+ parchive = new Path(cacheStatus.localizedLoadPath,
+ new Path(cacheStatus.localizedLoadPath.getName()));
} else {
- parchive = cacheStatus.localLoadPath;
+ parchive = cacheStatus.localizedLoadPath;
}
if (!localFs.mkdirs(parchive.getParent())) {
throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
+ cacheStatus.localizedLoadPath.toString());
}
String cacheId = cache.getPath();
@@ -392,29 +486,45 @@
FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ Long dirSize = baseDirSize.get(cacheStatus.localizedBaseDir);
if( dirSize == null ) {
dirSize = Long.valueOf(cacheSize);
} else {
dirSize += cacheSize;
}
- baseDirSize.put(cacheStatus.baseDir, dirSize);
+ baseDirSize.put(cacheStatus.localizedBaseDir, dirSize);
}
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
- LOG.warn("Exception in chmod" + e.toString());
- }
+ // set proper permissions for the localized directory
+ setPermissions(conf, cacheStatus, isPublic);
// update cacheStatus to reflect the newly cached file
cacheStatus.mtime = getTimestamp(conf, cache);
LOG.info(String.format("Cached %s as %s",
- cache.toString(), cacheStatus.localLoadPath));
- return cacheStatus.localLoadPath;
+ cache.toString(), cacheStatus.localizedLoadPath));
+ return cacheStatus.localizedLoadPath;
+ }
+
+ private void setPermissions(Configuration conf, CacheStatus cacheStatus,
+ boolean isPublic) throws IOException {
+ if (isPublic) {
+ Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
+ LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
+ try {
+ FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
+ } catch (InterruptedException e) {
+ LOG.warn("Exception in chmod" + e.toString());
+ throw new IOException(e);
+ }
+ } else {
+ // invoke taskcontroller to set permissions
+ DistributedCacheFileContext context = new DistributedCacheFileContext(
+ conf.get(JobContext.USER_NAME), new File(cacheStatus.localizedBaseDir
+ .toString()), cacheStatus.localizedBaseDir,
+ cacheStatus.uniqueString);
+ taskController.initializeDistributedCacheFile(context);
+ }
}
private static boolean isTarFile(String filename) {
@@ -485,10 +595,10 @@
static class CacheStatus {
// the local load path of this cache
- Path localLoadPath;
+ Path localizedLoadPath;
//the base dir where the cache lies
- Path baseDir;
+ Path localizedBaseDir;
//the size of this cache
long size;
@@ -501,18 +611,28 @@
// is it initialized ?
boolean inited = false;
+
+ // The sub directory (tasktracker/archive or tasktracker/user/archive),
+ // under which the file will be localized
+ Path subDir;
- public CacheStatus(Path baseDir, Path localLoadPath) {
+ // unique string used in the construction of local load path
+ String uniqueString;
+
+ public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
+ String uniqueString) {
super();
- this.localLoadPath = localLoadPath;
+ this.localizedLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
- this.baseDir = baseDir;
+ this.localizedBaseDir = baseDir;
this.size = 0;
+ this.subDir = subDir;
+ this.uniqueString = uniqueString;
}
Path getBaseDir(){
- return this.baseDir;
+ return this.localizedBaseDir;
}
// mark it as initialized
@@ -524,6 +644,10 @@
boolean isInited() {
return inited;
}
+
+ Path getLocalizedUniqueDir() {
+ return new Path(localizedBaseDir, new Path(subDir, uniqueString));
+ }
}
/**
@@ -535,7 +659,7 @@
synchronized (cachedArchives) {
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
try {
- localFs.delete(f.getValue().localLoadPath, true);
+ deleteLocalPath(asyncDiskService, localFs, f.getValue().localizedLoadPath);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
}
@@ -585,8 +709,60 @@
setFileTimestamps(job, fileTimestamps.toString());
}
}
+ /**
+ * Determines the visibilities of the distributed cache files and
+ * archives. The visibility of a cache path is "public" if the leaf component
+ * has READ permissions for others, and the parent subdirs have
+ * EXECUTE permissions for others
+ * @param job
+ * @throws IOException
+ */
+ public static void determineCacheVisibilities(Configuration job)
+ throws IOException {
+ URI[] tarchives = DistributedCache.getCacheArchives(job);
+ if (tarchives != null) {
+ StringBuffer archiveVisibilities =
+ new StringBuffer(String.valueOf(isPublic(job, tarchives[0])));
+ for (int i = 1; i < tarchives.length; i++) {
+ archiveVisibilities.append(",");
+ archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i])));
+ }
+ setArchiveVisibilities(job, archiveVisibilities.toString());
+ }
+ URI[] tfiles = DistributedCache.getCacheFiles(job);
+ if (tfiles != null) {
+ StringBuffer fileVisibilities =
+ new StringBuffer(String.valueOf(isPublic(job, tfiles[0])));
+ for (int i = 1; i < tfiles.length; i++) {
+ fileVisibilities.append(",");
+ fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i])));
+ }
+ setFileVisibilities(job, fileVisibilities.toString());
+ }
+ }
/**
+ * Get the booleans on whether the files are public or not. Used by
+ * internal DistributedCache and MapReduce code.
+ * @param conf The configuration which stored the timestamps
+ * @return a string array of booleans
+ * @throws IOException
+ */
+ static String[] getFileVisibilities(Configuration conf) {
+ return conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+ }
+
+ /**
+ * Get the booleans on whether the archives are public or not. Used by
+ * internal DistributedCache and MapReduce code.
+ * @param conf The configuration which stored the timestamps
+ * @return a string array of booleans
+ */
+ static String[] getArchiveVisibilities(Configuration conf) {
+ return conf.getStrings(JobContext.CACHE_ARCHIVES_VISIBILITIES);
+ }
+
+ /**
* This method checks if there is a conflict in the fragment names
* of the uris. Also makes sure that each uri has a fragment. It
* is only to be called if you want to create symlinks for
@@ -631,6 +807,28 @@
}
return true;
}
+ /**
+ * This is to check the public/private visibility of the archives to be
+ * localized.
+ *
+ * @param conf Configuration which stores the timestamp's
+ * @param booleans comma separated list of booleans (true - public)
+ * The order should be the same as the order in which the archives are added.
+ */
+ static void setArchiveVisibilities(Configuration conf, String booleans) {
+ conf.set(JobContext.CACHE_ARCHIVES_VISIBILITIES, booleans);
+ }
+
+ /**
+ * This is to check the public/private visibility of the files to be localized
+ *
+ * @param conf Configuration which stores the timestamp's
+ * @param booleans comma separated list of booleans (true - public)
+ * The order should be the same as the order in which the files are added.
+ */
+ static void setFileVisibilities(Configuration conf, String booleans) {
+ conf.set(JobContext.CACHE_FILE_VISIBILITIES, booleans);
+ }
/**
* This is to check the timestamp of the archives to be localized.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Jan 26 14:02:53 2010
@@ -24,8 +24,11 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -80,6 +83,18 @@
private HistoryCleaner historyCleanerThread = null;
+ private Map<JobID, MovedFileInfo> jobHistoryFileMap =
+ Collections.<JobID,MovedFileInfo>synchronizedMap(
+ new LinkedHashMap<JobID, MovedFileInfo>());
+
+ private static class MovedFileInfo {
+ private final String historyFile;
+ private final long timestamp;
+ public MovedFileInfo(String historyFile, long timestamp) {
+ this.historyFile = historyFile;
+ this.timestamp = timestamp;
+ }
+ }
/**
* Initialize Job History Module
* @param jt Job Tracker handle
@@ -196,6 +211,16 @@
}
/**
+ * Given the job id, return the history file path from the cache
+ */
+ public String getHistoryFilePath(JobID jobId) {
+ MovedFileInfo info = jobHistoryFileMap.get(jobId);
+ if (info == null) {
+ return null;
+ }
+ return info.historyFile;
+ }
+ /**
* Create an event writer for the Job represented by the jobID.
* This should be the first call to history for a job
* @param jobId
@@ -383,7 +408,8 @@
historyFileDonePath = new Path(done,
historyFile.getName()).toString();
}
-
+ jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
+ System.currentTimeMillis()));
jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id),
historyFileDonePath);
@@ -481,6 +507,21 @@
}
}
}
+ //walking over the map to purge entries from jobHistoryFileMap
+ synchronized (jobHistoryFileMap) {
+ Iterator<Entry<JobID, MovedFileInfo>> it =
+ jobHistoryFileMap.entrySet().iterator();
+ while (it.hasNext()) {
+ MovedFileInfo info = it.next().getValue();
+ if (now - info.timestamp > maxAgeOfHistoryFiles) {
+ it.remove();
+ } else {
+ //since entries are in sorted timestamp order, no more entries
+ //are required to be checked
+ break;
+ }
+ }
+ }
} catch (IOException ie) {
LOG.info("Error cleaning up history directory" +
StringUtils.stringifyException(ie));
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Jan 26 14:02:53 2010
@@ -23,18 +23,25 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.TimeZone;
+import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A RecordReader that reads records from an Oracle SQL table.
*/
public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T> {
+ private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+
public OracleDBRecordReader(DBInputFormat.DBInputSplit split,
Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
String cond, String [] fields, String table) throws SQLException {
super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+ setSessionTimeZone(conn);
}
/** Returns the query for selecting the records from an Oracle DB. */
@@ -86,4 +93,43 @@
return query.toString();
}
+
+ /**
+ * Set session time zone
+ * @param conn Connection object
+ * @throws SQLException instance
+ */
+ private void setSessionTimeZone(Connection conn) throws SQLException {
+ // need to use reflection to call the method setSessionTimeZone on the OracleConnection class
+ // because oracle specific java libraries are not accessible in this context
+ Method method;
+ try {
+ method = conn.getClass().getMethod(
+ "setSessionTimeZone", new Class [] {String.class});
+ } catch (Exception ex) {
+ LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+
+ // Need to set the time zone in order for Java
+ // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
+ String clientTimeZone = TimeZone.getDefault().getID();
+ try {
+ method.setAccessible(true);
+ method.invoke(conn, clientTimeZone);
+ LOG.info("Time zone has been set");
+ } catch (Exception ex) {
+ LOG.warn("Time zone " + clientTimeZone +
+ " could not be set on oracle database.");
+ LOG.info("Setting default time zone: UTC");
+ try {
+ method.invoke(conn, "UTC");
+ } catch (Exception ex2) {
+ LOG.error("Could not set time zone for oracle connection", ex2);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+ }
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionHelper.java Tue Jan 26 14:02:53 2010
@@ -34,7 +34,8 @@
*
* The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
*
- * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The map output field list spec is under attribute
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
* The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
* key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
* Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -45,7 +46,8 @@
* Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
* and use fields 6,5,1,2,3,7 and above for values.
*
- * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * The reduce output field list spec is under attribute
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
*
* The reducer extracts output key/value pairs in a similar manner, except that
* the key is never ignored.
@@ -57,9 +59,9 @@
public static final String DATA_FIELD_SEPERATOR =
"mapreduce.fieldsel.data.field.separator";
public static final String MAP_OUTPUT_KEY_VALUE_SPEC =
- "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec";
+ "mapreduce.fieldsel.map.output.key.value.fields.spec";
public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC =
- "mapreduce.fieldsel.mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec";
+ "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionMapper.java Tue Jan 26 14:02:53 2010
@@ -42,7 +42,8 @@
* The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
*
* The map output field list spec is under attribute
- * "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.map.output.key.value.fields.spec".
+ * The value is expected to be like
* "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
* field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a
* simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/fieldsel/FieldSelectionReducer.java Tue Jan 26 14:02:53 2010
@@ -41,7 +41,8 @@
* The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
*
* The reduce output field list spec is under attribute
- * "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like
+ * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
+ * The value is expected to be like
* "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,)
* separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec
* can be a simple number (e.g. 5) specifying a specific field, or a range
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Jan 26 14:02:53 2010
@@ -238,6 +238,14 @@
return result;
}
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts) {
+ return new FileSplit(file, start, length, hosts);
+ }
/**
* Generate the list of files and make them into FileSplits.
@@ -261,20 +269,20 @@
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
+ splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
+ splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
+ splits.add(makeSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Tue Jan 26 14:02:53 2010
@@ -47,11 +47,11 @@
public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
final public static String FILTER_CLASS =
- "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.class";
+ "mapreduce.input.sequencefileinputfilter.class";
final public static String FILTER_FREQUENCY =
- "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.frequency";
+ "mapreduce.input.sequencefileinputfilter.frequency";
final public static String FILTER_REGEX =
- "mapreduce.input.mapreduce.input.mapreduce.input.sequencefileinputfilter.regex";
+ "mapreduce.input.sequencefileinputfilter.regex";
public SequenceFileInputFilter() {
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Tue Jan 26 14:02:53 2010
@@ -125,7 +125,6 @@
Deserializer deserializer = factory.getDeserializer(inputSplitClass);
deserializer.open((DataInputStream)in);
inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
- deserializer.close();
}
private Class<?> readClass(DataInput in) throws IOException {
@@ -147,7 +146,6 @@
factory.getSerializer(inputSplitClass);
serializer.open((DataOutputStream)out);
serializer.serialize(inputSplit);
- serializer.close();
}
public Configuration getConf() {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Tue Jan 26 14:02:53 2010
@@ -128,7 +128,6 @@
factory.getSerializer(s.getClass());
serializer.open((DataOutputStream)out);
serializer.serialize(s);
- serializer.close();
}
}
@@ -155,7 +154,6 @@
Deserializer deserializer = factory.getDeserializer(cls[i]);
deserializer.open((DataInputStream)in);
splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
- deserializer.close();
}
} catch (ClassNotFoundException e) {
throw new IOException("Failed split init", e);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Jan 26 14:02:53 2010
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
@@ -33,6 +32,7 @@
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
/**
@@ -87,8 +87,12 @@
* Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
* Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
* to ClusterMetrics as part of MAPREDUCE-1048.
+ * Version 30: Job submission files are uploaded to a staging area under
+ * user home dir. JobTracker reads the required files from the
+ * staging area using user credentials passed via the rpc.
+ * Version 31: Added TokenStorage to submitJob
*/
- public static final long versionID = 29L;
+ public static final long versionID = 31L;
/**
* Allocate a name for the job.
@@ -100,9 +104,8 @@
/**
* Submit a Job for execution. Returns the latest profile for
* that job.
- * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
*/
- public JobStatus submitJob(JobID jobName)
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
throws IOException, InterruptedException;
/**
@@ -219,7 +222,15 @@
*
* @return the system directory where job-specific files are to be placed.
*/
- public String getSystemDir() throws IOException, InterruptedException;
+ public String getSystemDir() throws IOException, InterruptedException;
+
+ /**
+ * Get a hint from the JobTracker
+ * where job-specific files are to be placed.
+ *
+ * @return the directory where job-specific files are to be placed.
+ */
+ public String getStagingAreaDir() throws IOException, InterruptedException;
/**
* Gets the directory location of the completed job history files.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/security/SecureShuffleUtils.java Tue Jan 26 14:02:53 2010
@@ -22,16 +22,13 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.SecretKey;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.record.Utils;
/**
@@ -43,62 +40,17 @@
public class SecureShuffleUtils {
public static final String HTTP_HEADER_URL_HASH = "UrlHash";
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
- public static KeyGenerator kg = null;
- public static String DEFAULT_ALG="HmacSHA1";
-
- private SecretKeySpec secretKey;
- private Mac mac;
-
/**
- * static generate keys
- * @return new encoded key
- * @throws NoSuchAlgorithmException
+ * file name used on HDFS for generated job token
*/
- public static byte[] getNewEncodedKey() throws NoSuchAlgorithmException{
- SecretKeySpec key = generateKey(DEFAULT_ALG);
- return key.getEncoded();
- }
-
- private static SecretKeySpec generateKey(String alg) throws NoSuchAlgorithmException {
- if(kg==null) {
- kg = KeyGenerator.getInstance(alg);
- }
- return (SecretKeySpec) kg.generateKey();
- }
-
- /**
- * Create a util object with alg and key
- * @param sKeyEncoded
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public SecureShuffleUtils(byte [] sKeyEncoded)
- throws IOException{
- secretKey = new SecretKeySpec(sKeyEncoded, SecureShuffleUtils.DEFAULT_ALG);
- try {
- mac = Mac.getInstance(DEFAULT_ALG);
- mac.init(secretKey);
- } catch (NoSuchAlgorithmException nae) {
- throw new IOException(nae);
- } catch( InvalidKeyException ie) {
- throw new IOException(ie);
- }
- }
-
- /**
- * get key as byte[]
- * @return encoded key
- */
- public byte [] getEncodedKey() {
- return secretKey.getEncoded();
- }
+ public static final String JOB_TOKEN_FILENAME = "jobToken";
/**
* Base64 encoded hash of msg
* @param msg
*/
- public String generateHash(byte[] msg) {
- return new String(Base64.encodeBase64(generateByteHash(msg)));
+ public static String generateHash(byte[] msg, SecretKey key) {
+ return new String(Base64.encodeBase64(generateByteHash(msg, key)));
}
/**
@@ -106,8 +58,8 @@
* @param msg
* @return
*/
- private byte[] generateByteHash(byte[] msg) {
- return mac.doFinal(msg);
+ private static byte[] generateByteHash(byte[] msg, SecretKey key) {
+ return JobTokenSecretManager.computeHash(msg, key);
}
/**
@@ -115,20 +67,21 @@
* @param newHash
* @return true if is the same
*/
- private boolean verifyHash(byte[] hash, byte[] msg) {
- byte[] msg_hash = generateByteHash(msg);
+ private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
+ byte[] msg_hash = generateByteHash(msg, key);
return Utils.compareBytes(msg_hash, 0, msg_hash.length, hash, 0, hash.length) == 0;
}
/**
* Aux util to calculate hash of a String
* @param enc_str
+ * @param key
* @return Base64 encodedHash
* @throws IOException
*/
- public String hashFromString(String enc_str)
+ public static String hashFromString(String enc_str, SecretKey key)
throws IOException {
- return generateHash(enc_str.getBytes());
+ return generateHash(enc_str.getBytes(), key);
}
/**
@@ -137,11 +90,11 @@
* @param msg
* @throws IOException if not the same
*/
- public void verifyReply(String base64Hash, String msg)
+ public static void verifyReply(String base64Hash, String msg, SecretKey key)
throws IOException {
byte[] hash = Base64.decodeBase64(base64Hash.getBytes());
- boolean res = verifyHash(hash, msg.getBytes());
+ boolean res = verifyHash(hash, msg.getBytes(), key);
if(res != true) {
throw new IOException("Verification of the hashReply failed");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Jan 26 14:02:53 2010
@@ -80,6 +80,8 @@
public static final String JT_AVG_BLACKLIST_THRESHOLD =
"mapreduce.jobtracker.blacklist.average.threshold";
public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
+ public static final String JT_STAGING_AREA_ROOT =
+ "mapreduce.jobtracker.staging.root.dir";
public static final String JT_MAX_TRACKER_BLACKLISTS =
"mapreduce.jobtracker.tasktracker.maxblacklists";
public static final String JT_JOBHISTORY_MAXAGE =
@@ -88,4 +90,6 @@
"mapreduce.jobtracker.maxmapmemory.mb";
public static final String JT_MAX_REDUCEMEMORY_MB =
"mapreduce.jobtracker.maxreducememory.mb";
+ public static final String MAX_JOB_SPLIT_METAINFO_SIZE =
+ "mapreduce.job.split.metainfo.maxsize";
}
|