Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Jun 9 16:11:19 2009
@@ -39,8 +39,6 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.conf.Configuration;
-
-
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
@@ -611,13 +609,13 @@
// represents a fake queue configuration info
static class FakeQueueInfo {
String queueName;
- float gc;
+ float capacity;
boolean supportsPrio;
int ulMin;
- public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
+ public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
this.queueName = queueName;
- this.gc = gc;
+ this.capacity = capacity;
this.supportsPrio = supportsPrio;
this.ulMin = ulMin;
}
@@ -647,10 +645,10 @@
}*/
public float getCapacity(String queue) {
- if(queueMap.get(queue).gc == -1) {
+ if(queueMap.get(queue).capacity == -1) {
return super.getCapacity(queue);
}
- return queueMap.get(queue).gc;
+ return queueMap.get(queue).capacity;
}
public int getMinimumUserLimitPercent(String queue) {
@@ -905,13 +903,6 @@
return queue.toArray(new JobInProgress[0]);
}
- /*protected void submitJobs(int number, int state, int maps, int reduces)
- throws IOException {
- for (int i = 0; i < number; i++) {
- submitJob(state, maps, reduces);
- }
- }*/
-
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
@@ -1048,7 +1039,7 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- // set the gc % as 10%, so that gc will be zero initially as
+ // set the capacity % as 10%, so that capacity will be zero initially as
// the cluster capacity increase slowly.
queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
@@ -1082,7 +1073,7 @@
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
- // in terms of runningMaps / gc.
+ // in terms of runningMaps / capacity.
checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
verifyCapacity("1", "default");
verifyCapacity("9", "q2");
@@ -1093,7 +1084,7 @@
String schedInfo = taskTrackerManager.getQueueManager().
getSchedulerInfo(queue).toString();
assertTrue(schedInfo.contains("Map tasks\nCapacity: "
- + expectedCapacity));
+ + expectedCapacity + " slots"));
}
// test capacity transfer
@@ -1274,7 +1265,82 @@
// first in the queue
checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
}
-
+
+ /**
+ * Test to verify that high memory jobs hit user limits faster than any normal
+ * job.
+ *
+ * @throws IOException
+ */
+ public void testUserLimitsForHighMemoryJobs()
+ throws IOException {
+ taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ String[] qs = { "default" };
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ resConf.setFakeQueues(queues);
+ // enabled memory-based scheduling
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // Submit one normal job to the other queue.
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setUser("u1");
+ jConf.setQueueName("default");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ + "6 map and 6 reduce tasks");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setQueueName("default");
+ jConf.setUser("u2");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Verify that normal job takes 3 task assignments to hit user limits
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+ // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
+ // are hit. So u2 should get slots
+
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+
+ // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+ // slots. Because of high memory tasks, giving u2 another task would
+ // overflow limits. So, no more tasks should be given to anyone.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ }
+
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.
@@ -1324,21 +1390,35 @@
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
- int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+ int totalReduces =
+ taskTrackerManager.getClusterStatus().getMaxReduceTasks();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
- String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
- String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+ String schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ String schedulingInfo2 =
+ queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
- assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
- assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
- assertEquals(infoStrings[2] , "User Limit: 25%");
- assertEquals(infoStrings[3] , "Priority Supported: YES");
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
- assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[0], "Queue configuration");
+ assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
+ assertEquals(infoStrings[2], "User Limit: 25%");
+ assertEquals(infoStrings[3], "Priority Supported: YES");
+ assertEquals(infoStrings[4], "-------------");
+ assertEquals(infoStrings[5], "Map tasks");
+ assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+ + " slots");
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[9], "-------------");
+ assertEquals(infoStrings[10], "Reduce tasks");
+ assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+ + " slots");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[14], "-------------");
+ assertEquals(infoStrings[15], "Job info");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
+ assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
assertEquals(schedulingInfo, schedulingInfo2);
//Testing with actual job submission.
@@ -1349,10 +1429,13 @@
infoStrings = schedulingInfo.split("\n");
//waiting job should be equal to number of jobs submitted.
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
//Initalize the jobs but don't raise events
controlledInitializationPoller.selectJobsToInitialize();
@@ -1360,12 +1443,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings.length, 18);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
//Raise status change event so that jobs can move to running queue.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1382,10 +1467,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 18);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 0");
+ assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
//assign a reduce task
Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
@@ -1394,10 +1483,16 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 22);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[16], "Active users:");
+ assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1410,10 +1505,12 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1427,10 +1524,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+ assertEquals(infoStrings.length, 18);
+ //should be previous value as nothing is scheduled because no events
+ //has been raised after initialization.
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1445,10 +1546,14 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+ assertEquals(infoStrings.length, 18);
+ //should be previous value as nothing is scheduled because no events
+ //has been raised after initialization.
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
//Raise status change events as none of the intialized jobs would be
//in running queue as we just failed the second job which was initialized
@@ -1471,10 +1576,12 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 18);
- assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
- assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+ assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1484,11 +1591,10 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 16);
- assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
- assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
-
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
}
/**
@@ -1562,8 +1668,10 @@
scheduler.start();
// The situation : Two jobs in the queue. First job with only maps and no
- // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
- // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
+ // reduces and is a high memory job. Second job is a normal job with both
+ // maps and reduces.
+ // First job cannot run for want of memory for maps. In this case, second
+ // job's reduces should run.
LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
@@ -1589,9 +1697,17 @@
// first, a map from j1 will run
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
// at this point, the scheduler tries to schedule another map from j1.
// there isn't enough space. The second job's reduce should be scheduled.
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 1 reduce slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
}
/**
@@ -1625,7 +1741,7 @@
scheduler.start();
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 0 reduce tasks.");
+ + "1 map, 1 reduce tasks.");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
@@ -1637,7 +1753,20 @@
// Fill the second tt with this job.
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 25.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
@@ -1651,7 +1780,21 @@
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 3 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 3 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+ 75.0f);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 0 reduce tasks.");
@@ -1669,6 +1812,17 @@
assertNull(scheduler.assignTasks(tracker("tt2")));
assertNull(scheduler.assignTasks(tracker("tt1")));
assertNull(scheduler.assignTasks(tracker("tt2")));
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+ 75.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ (String) job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+ (String) job3.getSchedulingInfo());
}
/**
@@ -1717,9 +1871,23 @@
// 1st cycle - 1 map gets assigned.
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
+ // 1st cycle of reduces - 1 reduce gets assigned.
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ // Total 1 reduce slot should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
taskTrackerManager.killJob(job1.getJobID());
+ // No more map/reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+ 0.0f);
// retire the job
taskTrackerManager.removeJob(job1.getJobID());
@@ -1738,30 +1906,30 @@
// 2nd cycle - nothing should get assigned. Memory matching code
// will see the job is missing and fail memory requirements.
assertNull(scheduler.assignTasks(tracker("tt1")));
+ checkMemReservedForTasksOnTT("tt1", null, null);
+
// calling again should not make a difference, as the task is still running
assertNull(scheduler.assignTasks(tracker("tt1")));
+ checkMemReservedForTasksOnTT("tt1", null, null);
- // finish the task on the tracker.
+ // finish the tasks on the tracker.
taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+
// now a new task can be assigned.
t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 1 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
// reduce can be assigned.
t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ // Total 1 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 512L);
}
-
- protected TaskTrackerStatus tracker(String taskTrackerName) {
- return taskTrackerManager.getTaskTracker(taskTrackerName);
- }
-
- protected Task checkAssignment(String taskTrackerName,
- String expectedTaskString) throws IOException {
- List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
- assertNotNull(expectedTaskString, tasks);
- assertEquals(expectedTaskString, 1, tasks.size());
- assertEquals(expectedTaskString, tasks.get(0).toString());
- return tasks.get(0);
- }
-
+
/*
* Test cases for Job Initialization poller.
*/
@@ -1968,10 +2136,9 @@
checkFailedInitializedJobMovement();
// Check failed waiting job movement
- checkFailedWaitingJobMovement();
-
+ checkFailedWaitingJobMovement();
}
-
+
public void testStartWithoutDefaultQueueConfigured() throws Exception {
//configure a single queue which is not default queue
String[] qs = {"q1"};
@@ -2087,9 +2254,9 @@
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
- taskTrackerManager.finalizeJob(fjob2);
-
+ taskTrackerManager.finalizeJob(fjob2);
}
+
/**
* Test case to test scheduling of
* <ol>
@@ -2105,17 +2272,16 @@
* <ul>
* <li>Submit one high ram job which has speculative reduce.</li>
* <li>Submit a normal job which has no speculative reduce.</li>
- * <li>Scheduler should schedule first all reduce tasks from first job and block
- * the cluster till both reduces are completed.</li>
+ * <li>Scheduler should schedule first all reduce tasks from first job and
+ * block the cluster till both reduces are completed.</li>
* </ul>
* </li>
* </ol>
* @throws IOException
*/
public void testHighRamJobWithSpeculativeExecution() throws IOException {
- // 2 map and 2 reduce slots
+ // 2 TTs, 3 map and 3 reduce slots on each TT
taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
- // 1GB for each map, 1GB for each reduce
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2123,6 +2289,7 @@
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
+ // 1GB for each map, 1GB for each reduce
scheduler.getConf().setLong(
JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
3 * 1024L);
@@ -2146,8 +2313,9 @@
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(true);
jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
- jConf, taskTrackerManager,"u1");
+ FakeJobInProgress job1 =
+ new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+ taskTrackerManager, "u1");
taskTrackerManager.submitJob(job1);
//Submit normal job
@@ -2170,17 +2338,31 @@
//scheduled. This task would be scheduled. Till the tasks from job1 gets
//complete none of the tasks from other jobs would be scheduled.
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+
//make same tracker get back, check if you are blocking. Your job
//has speculative map task so tracker should be blocked even tho' it
//can run job2's map.
assertNull(scheduler.assignTasks(tracker("tt1")));
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
//TT2 now gets speculative map of the job1
checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+ // Total 4 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
// Now since the first job has no more speculative maps, it can schedule
// the second job.
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Total 5 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
+ checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
//finish everything
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
@@ -2202,14 +2384,15 @@
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(false);
jConf.setReduceSpeculativeExecution(true);
- FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
- jConf, taskTrackerManager,"u1");
+ FakeJobInProgress job3 =
+ new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+ taskTrackerManager, "u1");
taskTrackerManager.submitJob(job3);
//Submit normal job w.r.t reduces
jConf = new JobConf();
- jConf.setMemoryForMapTask(2 * 1024L);
- jConf.setMemoryForReduceTask(1 * 104L);
+ jConf.setMemoryForMapTask(1 * 1024L);
+ jConf.setMemoryForReduceTask(1 * 1024L);
jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
@@ -2223,24 +2406,165 @@
// Finish up the map scheduler
checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ // Total 2 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+ // Total 3 map slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
// first, a reduce from j3 will run
// at this point, there is a speculative task for the same job to be
//scheduled. This task would be scheduled. Till the tasks from job3 gets
//complete none of the tasks from other jobs would be scheduled.
checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
- assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+ assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
+ 0);
+ // Total 2 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+ 33.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
//make same tracker get back, check if you are blocking. Your job
//has speculative reduce task so tracker should be blocked even tho' it
//can run job4's reduce.
assertNull(scheduler.assignTasks(tracker("tt1")));
- //TT2 now gets speculative map of the job1
+ // Total 2 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+ 33.3f);
+
+ //TT2 now gets speculative reduce of the job3
checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+ // Total 4 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
+ 66.7f);
+ checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
// Now since j3 has no more speculative reduces, it can schedule
// the j4.
checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+ // Total 5 reduce slots should be accounted for.
+ checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
+ 83.3f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
+ }
+
+ /**
+ * Test to verify that queue ordering is based on the number of slots occupied
+ * and hence to verify that presence of high memory jobs is reflected properly
+ * while determining used capacities of queues and hence the queue ordering.
+ *
+ * @throws IOException
+ */
+ public void testQueueOrdering()
+ throws IOException {
+ taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ String[] qs = { "default", "q1" };
+ String[] reversedQs = { qs[1], qs[0] };
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
+ resConf.setFakeQueues(queues);
+ // enabled memory-based scheduling
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ + "6 map and 6 reduce tasks");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Submit a normal job to the other queue.
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(6);
+ jConf.setNumReduceTasks(6);
+ jConf.setUser("u1");
+ jConf.setQueueName("q1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // Map 1 of high memory job
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 1 of high memory job
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 1 of normal job
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 1 of normal job
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 2 of normal job
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 2 of normal job
+ checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Now both the queues are equally served. But the comparator doesn't change
+ // the order if queues are equally served.
+
+ // Map 3 of normal job
+ checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 3 of normal job
+ checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 2 of high memory job
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 2 of high memory job
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+ // Map 4 of normal job
+ checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+ // Reduce 4 of normal job
+ checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
+ checkQueuesOrder(reversedQs, scheduler
+ .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
}
private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2386,6 +2710,89 @@
userJobs.put(user, jips);
}
return userJobs;
+ }
+
+
+ protected TaskTrackerStatus tracker(String taskTrackerName) {
+ return taskTrackerManager.getTaskTracker(taskTrackerName);
+ }
+
+ protected Task checkAssignment(String taskTrackerName,
+ String expectedTaskString) throws IOException {
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ assertNotNull(expectedTaskString, tasks);
+ assertEquals(expectedTaskString, 1, tasks.size());
+ assertEquals(expectedTaskString, tasks.get(0).toString());
+ return tasks.get(0);
+ }
+
+ /**
+ * Get the amount of memory that is reserved for tasks on the taskTracker and
+ * verify that it matches what is expected.
+ *
+ * @param taskTracker
+ * @param expectedMemForMapsOnTT
+ * @param expectedMemForReducesOnTT
+ */
+ private void checkMemReservedForTasksOnTT(String taskTracker,
+ Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+ Long observedMemForMapsOnTT =
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+ CapacityTaskScheduler.TYPE.MAP);
+ Long observedMemForReducesOnTT =
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+ CapacityTaskScheduler.TYPE.REDUCE);
+ if (expectedMemForMapsOnTT == null) {
+ assertTrue(observedMemForMapsOnTT == null);
+ } else {
+ assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+ }
+ if (expectedMemForReducesOnTT == null) {
+ assertTrue(observedMemForReducesOnTT == null);
+ } else {
+ assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+ }
+ }
+ /**
+ * Verify the number of slots of type 'type' from the queue 'queue'.
+ *
+ * @param queue
+ * @param type
+ * @param numActiveUsers in the queue at present.
+ * @param expectedOccupiedSlots
+ * @param expectedOccupiedSlotsPercent
+ * @return
+ */
+ private void checkOccupiedSlots(String queue,
+ CapacityTaskScheduler.TYPE type, int numActiveUsers,
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+ scheduler.updateQSIInfoForTests();
+ QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
+ String schedulingInfo =
+ queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+ String[] infoStrings = schedulingInfo.split("\n");
+ int index = -1;
+ if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ index = 7;
+ } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+ }
+ LOG.info(infoStrings[index]);
+ assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
+ expectedOccupiedSlots, expectedOccupiedSlotsPercent),
+ infoStrings[index]);
+ }
+
+ private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
+ assertTrue("Observed and expected queues are not of same length.",
+ expectedOrder.length == observedOrder.length);
+ int i = 0;
+ for (String expectedQ : expectedOrder) {
+ assertTrue("Observed and expected queues are not in the same order. "
+ + "Differ at index " + i + ". Got " + observedOrder[i]
+ + " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
+ i++;
+ }
}
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/eclipse-plugin/build.xml Tue Jun 9 16:11:19 2009
@@ -67,7 +67,8 @@
<target name="jar" depends="compile" unless="skip.contrib">
<mkdir dir="${build.dir}/lib"/>
<copy file="${hadoop.root}/build/hadoop-${version}-core.jar" tofile="${build.dir}/lib/hadoop-core.jar" verbose="true"/>
- <copy file="${hadoop.root}/lib/commons-cli-2.0-SNAPSHOT.jar" todir="${build.dir}/lib" verbose="true"/>
+ <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli.version}.jar" todir="${build.dir}/lib" verbose="true"/>
+ <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli2.version}.jar" todir="${build.dir}/lib" verbose="true"/>
<jar
jarfile="${build.dir}/hadoop-${version}-${name}.jar"
manifest="${root}/META-INF/MANIFEST.MF">
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/bin/hdfsproxy Tue Jun 9 16:11:19 2009
@@ -114,12 +114,17 @@
CLASSPATH=${CLASSPATH}:$f;
done
fi
-
if [ -d "$HDFSPROXY_HOME/../../lib" ]; then
for f in $HDFSPROXY_HOME/../../lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
fi
+if [ -d "$HDFSPROXY_HOME/../../lib/jsp-2.1" ]; then
+ for f in $HDFSPROXY_HOME/../../lib/jsp-2.1/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+ done
+fi
+
# add user-specified CLASSPATH last
if [ "$HDFSPROXY_CLASSPATH" != "" ]; then
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/build.xml Tue Jun 9 16:11:19 2009
@@ -123,7 +123,7 @@
<include name="slf4j-api-${slf4j-api.version}.jar"/>
<include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
<include name="xmlenc-${xmlenc.version}.jar"/>
- <include name="core-${core.version}.jar"/>
+ <include name="core-${core.vesion}.jar"/>
</lib>
<classes dir="${proxy.conf.dir}" excludes="**/*.example **/*.template **/*.sh hadoop-site.xml"/>
<classes dir="${build.classes}"/>
@@ -143,7 +143,7 @@
<include name="slf4j-api-${slf4j-api.version}.jar"/>
<include name="slf4j-log4j12-${slf4j-log4j12.version}.jar"/>
<include name="xmlenc-${xmlenc.version}.jar"/>
- <include name="core-${core.version}.jar"/>
+ <include name="core-${core.vesion}.jar"/>
</lib>
<classes dir="${proxy.conf.dir}" excludes="**/*.example **/*.template **/*.sh hadoop-site.xml"/>
<classes dir="${build.classes}"/>
@@ -347,10 +347,12 @@
<include name="jetty-util-${jetty-util.version}.jar"/>
<include name="jetty-${jetty.version}.jar"/>
<include name="servlet-api-2.5-${servlet-api-2.5.version}.jar"/>
- <include name="core-${core.version}.jar"/>
- <include name="jsp-${jsp.version}-${jetty.version}.jar"/>
- <include name="jsp-api-${jsp.version}-${jetty.version}.jar"/>
- </fileset>
+ <include name="core-${core.vesion}.jar"/>
+ </fileset>
+ <fileset dir="${hadoop.root}/lib/jsp-${jsp.version}">
+ <include name="jsp-${jsp.version}.jar"/>
+ <include name="jsp-api-${jsp.version}.jar"/>
+ </fileset>
</copy>
<copy todir="${build.dir}/${final.name}/lib" includeEmptyDirs="false">
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/hdfsproxy/ivy.xml Tue Jun 9 16:11:19 2009
@@ -22,6 +22,10 @@
<artifact conf="master"/>
</publications>
<dependencies>
+ <dependency org="commons-cli"
+ name="commons-cli"
+ rev="${commons-cli.version}"
+ conf="common->default"/>
<dependency org="log4j"
name="log4j"
rev="${log4j.version}"
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java Tue Jun 9 16:11:19 2009
@@ -21,6 +21,7 @@
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.GenericJdbcManager;
import org.apache.hadoop.sqoop.manager.HsqldbManager;
+import org.apache.hadoop.sqoop.manager.LocalMySQLManager;
import org.apache.hadoop.sqoop.manager.MySQLManager;
import java.io.IOException;
@@ -70,7 +71,11 @@
}
if (scheme.equals("jdbc:mysql:")) {
- return new MySQLManager(opts);
+ if (opts.isLocal()) {
+ return new LocalMySQLManager(opts);
+ } else {
+ return new MySQLManager(opts);
+ }
} else if (scheme.equals("jdbc:hsqldb:hsql:")) {
return new HsqldbManager(opts);
} else {
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Tue Jun 9 16:11:19 2009
@@ -92,6 +92,9 @@
private String driverClassName;
private String warehouseDir;
private FileLayout layout;
+ private boolean local; // if true and conn is mysql, use mysqldump.
+
+ private String tmpDir; // where temp data goes; usually /tmp
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -136,6 +139,10 @@
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
+ String localImport = props.getProperty("local.import",
+ Boolean.toString(this.local)).toLowerCase();
+ this.local = "true".equals(localImport) || "yes".equals(localImport)
+ || "1".equals(localImport);
} catch (IOException ioe) {
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
} finally {
@@ -156,11 +163,12 @@
this.hadoopHome = System.getenv("HADOOP_HOME");
this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
- String tmpDir = System.getProperty("test.build.data", "/tmp/");
- if (!tmpDir.endsWith(File.separator)) {
- tmpDir = tmpDir + File.separator;
+ String myTmpDir = System.getProperty("test.build.data", "/tmp/");
+ if (!myTmpDir.endsWith(File.separator)) {
+ myTmpDir = myTmpDir + File.separator;
}
+ this.tmpDir = myTmpDir;
this.jarOutputDir = tmpDir + "sqoop/compile";
this.layout = FileLayout.TextFile;
@@ -178,6 +186,7 @@
System.out.println("--driver (class-name) Manually specify JDBC driver class to use");
System.out.println("--username (username) Set authentication username");
System.out.println("--password (password) Set authentication password");
+ System.out.println("--local Use local import fast path (mysql only)");
System.out.println("");
System.out.println("Import control options:");
System.out.println("--table (tablename) Table to read");
@@ -232,6 +241,8 @@
this.action = ControlAction.ListTables;
} else if (args[i].equals("--all-tables")) {
this.allTables = true;
+ } else if (args[i].equals("--local")) {
+ this.local = true;
} else if (args[i].equals("--username")) {
this.username = args[++i];
if (null == this.password) {
@@ -300,6 +311,13 @@
}
}
+ /** get the temporary directory; guaranteed to end in File.separator
+ * (e.g., '/')
+ */
+ public String getTmpDir() {
+ return tmpDir;
+ }
+
public String getConnectString() {
return connectString;
}
@@ -336,6 +354,10 @@
return password;
}
+ public boolean isLocal() {
+ return local;
+ }
+
/**
* @return location where .java files go; guaranteed to end with '/'
*/
@@ -393,4 +415,8 @@
public FileLayout getFileLayout() {
return this.layout;
}
+
+ public void setUsername(String name) {
+ this.username = name;
+ }
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Tue Jun 9 16:11:19 2009
@@ -41,6 +41,19 @@
public MySQLManager(final ImportOptions opts) {
super(DRIVER_CLASS, opts);
+
+ String connectString = opts.getConnectString();
+ if (null != connectString && connectString.indexOf("//localhost") != -1) {
+ // if we're not doing a remote connection, they should have a LocalMySQLManager.
+ LOG.warn("It looks like you are importing from mysql on localhost.");
+ LOG.warn("This transfer can be faster! Use the --local option to exercise a");
+ LOG.warn("MySQL-specific fast path.");
+ }
+ }
+
+ protected MySQLManager(final ImportOptions opts, boolean ignored) {
+ // constructor used by subclasses to avoid the --local warning.
+ super(DRIVER_CLASS, opts);
}
@Override
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Tue Jun 9 16:11:19 2009
@@ -43,9 +43,6 @@
/**
* Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
- *
- *
- *
*/
public class ImportJob {
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java Tue Jun 9 16:11:19 2009
@@ -130,6 +130,15 @@
}
}
+ // find sqoop jar for compilation classpath
+ String sqoopJar = findThisJar();
+ if (null != sqoopJar) {
+ sqoopJar = File.pathSeparator + sqoopJar;
+ } else {
+ LOG.warn("Could not find sqoop jar; child compilation may fail");
+ sqoopJar = "";
+ }
+
String curClasspath = System.getProperty("java.class.path");
args.add("-sourcepath");
@@ -140,7 +149,7 @@
args.add(jarOutDir);
args.add("-classpath");
- args.add(curClasspath + File.pathSeparator + coreJar);
+ args.add(curClasspath + File.pathSeparator + coreJar + sqoopJar);
// add all the source files
for (String srcfile : sources) {
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Tue Jun 9 16:11:19 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.sqoop;
+import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
@@ -44,6 +45,7 @@
suite.addTestSuite(TestColumnTypes.class);
suite.addTestSuite(TestMultiCols.class);
suite.addTestSuite(TestOrderBy.class);
+ suite.addTestSuite(LocalMySQLTest.class);
return suite;
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/ivy.xml Tue Jun 9 16:11:19 2009
@@ -24,6 +24,10 @@
<artifact conf="master"/>
</publications>
<dependencies>
+ <dependency org="org.apache.mahout.commons"
+ name="commons-cli"
+ rev="${commons-cli2.version}"
+ conf="common->default"/>
<dependency org="commons-logging"
name="commons-logging"
rev="${commons-logging.version}"
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jun 9 16:11:19 2009
@@ -376,6 +376,9 @@
if (errThread_ != null) {
errThread_.join(joinDelay_);
}
+ if (outerrThreadsThrowable != null) {
+ throw new RuntimeException(outerrThreadsThrowable);
+ }
} catch (InterruptedException e) {
//ignore
}
@@ -425,7 +428,11 @@
if (now-lastStdoutReport > reporterOutDelay_) {
lastStdoutReport = now;
String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
- reporter.setStatus(hline);
+ if (!processProvidedStatus_) {
+ reporter.setStatus(hline);
+ } else {
+ reporter.progress();
+ }
logprintln(hline);
logflush();
}
@@ -476,6 +483,7 @@
if (matchesCounter(lineStr)) {
incrCounter(lineStr);
} else if (matchesStatus(lineStr)) {
+ processProvidedStatus_ = true;
setStatus(lineStr);
} else {
LOG.warn("Cannot parse reporter line: " + lineStr);
@@ -572,6 +580,7 @@
if (sim != null) sim.destroy();
logprintln("mapRedFinished");
} catch (RuntimeException e) {
+ logprintln("PipeMapRed failed!");
logStackTrace(e);
throw e;
}
@@ -682,4 +691,5 @@
String LOGNAME;
PrintStream log_;
+ volatile boolean processProvidedStatus_ = false;
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java Tue Jun 9 16:11:19 2009
@@ -32,8 +32,16 @@
* postWriteLines to stderr.
*/
public static void go(int preWriteLines, int sleep, int postWriteLines) throws IOException {
+ go(preWriteLines, sleep, postWriteLines, false);
+ }
+
+ public static void go(int preWriteLines, int sleep, int postWriteLines, boolean status) throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
+
+ if (status) {
+ System.err.println("reporter:status:starting echo");
+ }
while (preWriteLines > 0) {
--preWriteLines;
@@ -57,13 +65,14 @@
public static void main(String[] args) throws IOException {
if (args.length < 3) {
- System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE");
+ System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE [STATUS]");
return;
}
int preWriteLines = Integer.parseInt(args[0]);
int sleep = Integer.parseInt(args[1]);
int postWriteLines = Integer.parseInt(args[2]);
+ boolean status = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
- go(preWriteLines, sleep, postWriteLines);
+ go(preWriteLines, sleep, postWriteLines, status);
}
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/core-default.xml Tue Jun 9 16:11:19 2009
@@ -257,6 +257,12 @@
</description>
</property>
+<property>
+ <name>fs.s3n.block.size</name>
+ <value>67108864</value>
+ <description>Block size to use when reading files using the native S3
+ filesystem (s3n: URIs).</description>
+</property>
<property>
<name>local.cache.size</name>
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/filecache/DistributedCache.java Tue Jun 9 16:11:19 2009
@@ -683,7 +683,7 @@
throws IOException {
String classpath = conf.get("mapred.job.classpath.files");
conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
- : classpath + System.getProperty("path.separator") + file.toString());
+ : classpath + "," + file.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
@@ -696,14 +696,14 @@
* @param conf Configuration that contains the classpath setting
*/
public static Path[] getFileClassPaths(Configuration conf) {
- String classpath = conf.get("mapred.job.classpath.files");
- if (classpath == null)
- return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
- .getProperty("path.separator")));
+ ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+ "mapred.job.classpath.files");
+ if (list.size() == 0) {
+ return null;
+ }
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path((String) list.get(i));
+ paths[i] = new Path(list.get(i));
}
return paths;
}
@@ -719,8 +719,7 @@
throws IOException {
String classpath = conf.get("mapred.job.classpath.archives");
conf.set("mapred.job.classpath.archives", classpath == null ? archive
- .toString() : classpath + System.getProperty("path.separator")
- + archive.toString());
+ .toString() : classpath + "," + archive.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(archive).toUri();
@@ -733,14 +732,14 @@
* @param conf Configuration that contains the classpath setting
*/
public static Path[] getArchiveClassPaths(Configuration conf) {
- String classpath = conf.get("mapred.job.classpath.archives");
- if (classpath == null)
- return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
- .getProperty("path.separator")));
+ ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+ "mapred.job.classpath.archives");
+ if (list.size() == 0) {
+ return null;
+ }
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path((String) list.get(i));
+ paths[i] = new Path(list.get(i));
}
return paths;
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/FsShell.java Tue Jun 9 16:11:19 2009
@@ -1170,10 +1170,13 @@
while (true) {
FSDataInputStream in = srcFs.open(path);
- in.seek(offset);
- IOUtils.copyBytes(in, System.out, 1024, false);
- offset = in.getPos();
- in.close();
+ try {
+ in.seek(offset);
+ IOUtils.copyBytes(in, System.out, 1024);
+ offset = in.getPos();
+ } finally {
+ in.close();
+ }
if (!foption) {
break;
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Tue Jun 9 16:11:19 2009
@@ -122,7 +122,6 @@
}
@Override
- @Deprecated
public boolean isDirectory(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
@@ -133,7 +132,6 @@
}
@Override
- @Deprecated
public boolean isFile(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Jun 9 16:11:19 2009
@@ -332,6 +332,11 @@
}
return new S3FileStatus(f.makeQualified(this), inode);
}
+
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
+ }
// diagnostic methods
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java Tue Jun 9 16:11:19 2009
@@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -53,10 +54,7 @@
s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
bucket = new S3Bucket(uri.getHost());
}
@@ -76,10 +74,7 @@
}
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
} finally {
if (in != null) {
try {
@@ -99,10 +94,7 @@
object.setContentLength(0);
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
}
@@ -116,10 +108,8 @@
if (e.getMessage().contains("ResponseCode=404")) {
return null;
}
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
+ return null; //never returned - keep compiler happy
}
}
@@ -128,13 +118,8 @@
S3Object object = s3Service.getObject(bucket, key);
return object.getDataInputStream();
} catch (S3ServiceException e) {
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
- return null;
- }
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
+ return null; //never returned - keep compiler happy
}
}
@@ -145,32 +130,22 @@
null, byteRangeStart, null);
return object.getDataInputStream();
} catch (S3ServiceException e) {
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
- return null;
- }
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
+ return null; //never returned - keep compiler happy
}
}
public PartialListing list(String prefix, int maxListingLength)
throws IOException {
- return list(prefix, maxListingLength, null);
+ return list(prefix, maxListingLength, null, false);
}
- public PartialListing list(String prefix, int maxListingLength,
- String priorLastKey) throws IOException {
+ public PartialListing list(String prefix, int maxListingLength, String priorLastKey,
+ boolean recurse) throws IOException {
- return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
+ return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
}
- public PartialListing listAll(String prefix, int maxListingLength,
- String priorLastKey) throws IOException {
-
- return list(prefix, null, maxListingLength, priorLastKey);
- }
private PartialListing list(String prefix, String delimiter,
int maxListingLength, String priorLastKey) throws IOException {
@@ -191,10 +166,8 @@
return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
chunk.getCommonPrefixes());
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
+ return null; //never returned - keep compiler happy
}
}
@@ -202,36 +175,27 @@
try {
s3Service.deleteObject(bucket, key);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(key, e);
}
}
- public void rename(String srcKey, String dstKey) throws IOException {
+ public void copy(String srcKey, String dstKey) throws IOException {
try {
- s3Service.moveObject(bucket.getName(), srcKey, bucket.getName(),
+ s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
new S3Object(dstKey), false);
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(srcKey, e);
}
}
public void purge(String prefix) throws IOException {
try {
S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
- for (int i = 0; i < objects.length; i++) {
- s3Service.deleteObject(bucket, objects[i].getKey());
+ for (S3Object object : objects) {
+ s3Service.deleteObject(bucket, object.getKey());
}
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
}
@@ -240,16 +204,29 @@
sb.append(bucket.getName()).append("\n");
try {
S3Object[] objects = s3Service.listObjects(bucket);
- for (int i = 0; i < objects.length; i++) {
- sb.append(objects[i].getKey()).append("\n");
+ for (S3Object object : objects) {
+ sb.append(object.getKey()).append("\n");
}
} catch (S3ServiceException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- }
- throw new S3Exception(e);
+ handleServiceException(e);
}
System.out.println(sb);
}
-
+
+ private void handleServiceException(String key, S3ServiceException e) throws IOException {
+ if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+ throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
+ } else {
+ handleServiceException(e);
+ }
+ }
+
+ private void handleServiceException(S3ServiceException e) throws IOException {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ else {
+ throw new S3Exception(e);
+ }
+ }
}
Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java?rev=783055&r1=783054&r2=783055&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java Tue Jun 9 16:11:19 2009
@@ -42,14 +42,12 @@
InputStream retrieve(String key, long byteRangeStart) throws IOException;
PartialListing list(String prefix, int maxListingLength) throws IOException;
- PartialListing list(String prefix, int maxListingLength, String priorLastKey)
+ PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive)
throws IOException;
- PartialListing listAll(String prefix, int maxListingLength,
- String priorLastKey) throws IOException;
void delete(String key) throws IOException;
- void rename(String srcKey, String dstKey) throws IOException;
+ void copy(String srcKey, String dstKey) throws IOException;
/**
* Delete all keys with the given prefix. Used for testing.
|