hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [32/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.local.LocalFs;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
+
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+import org.apache.hadoop.yarn.URL;
+
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
+public class TestApplicationLocalizer {
+
+  static final Path basedir =
+      new Path("target", TestApplicationLocalizer.class.getName());
+
+  private static final FsPermission urwx =
+    FsPermission.createImmutable((short) 0700);
+  private static final FsPermission urwx_gx =
+    FsPermission.createImmutable((short) 0710);
+
+  static DataInputBuffer createFakeCredentials(Random r, int nTok)
+      throws IOException {
+    Credentials creds = new Credentials();
+    byte[] password = new byte[20];
+    Text kind = new Text();
+    Text service = new Text();
+    Text alias = new Text();
+    for (int i = 0; i < nTok; ++i) {
+      byte[] identifier = ("idef" + i).getBytes();
+      r.nextBytes(password);
+      kind.set("kind" + i);
+      service.set("service" + i);
+      alias.set("token" + i);
+      Token token = new Token(identifier, password, kind, service);
+      creds.addToken(alias, token);
+    }
+    DataOutputBuffer buf = new DataOutputBuffer();
+    creds.writeTokenStorageToStream(buf);
+    DataInputBuffer ret = new DataInputBuffer();
+    ret.reset(buf.getData(), 0, buf.getLength());
+    return ret;
+  }
+
+  static Collection<LocalResource> createFakeResources(Random r, int nFiles,
+      Map<Long,LocalResource> sizes) throws IOException {
+    ArrayList<LocalResource> rsrc = new ArrayList<LocalResource>();
+    long basetime = r.nextLong() >>> 2;
+    for (int i = 0; i < nFiles; ++i) {
+      LocalResource resource = new LocalResource();
+      URL path = new URL();
+      path.scheme = "file";
+      path.host = null;
+      path.port = 0;
+      resource.timestamp = basetime + i;
+      r.setSeed(resource.timestamp);
+      sizes.put(r.nextLong() & Long.MAX_VALUE, resource);
+      StringBuilder sb = new StringBuilder("/" + r.nextLong());
+      while (r.nextInt(2) == 1) {
+        sb.append("/" + r.nextLong());
+      }
+      path.file = sb.toString();
+      resource.resource = path;
+      resource.size = -1;
+      resource.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
+      resource.state = PRIVATE;
+      rsrc.add(resource);
+    }
+    return rsrc;
+  }
+
+  static DataInputBuffer writeFakeAppFiles(Collection<LocalResource> rsrc)
+      throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ApplicationLocalizer.writeResourceDescription(dob, rsrc);
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(dob.getData(), 0, dob.getLength());
+    return dib;
+  }
+
+  @Test
+  public void testLocalizationMain() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    // don't actually create dirs
+    doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(),
+        Matchers.<FsPermission>anyObject(), anyBoolean());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+
+    // TODO mocked FileContext requires relative paths; LTC will provide abs
+    List<Path> localDirs = new ArrayList<Path>();
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(new Path(basedir,
+            new Path(i + "", ApplicationLocalizer.USERCACHE)));
+    }
+
+    final Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    // return credential stream instead of opening local file
+    DataInputBuffer appTokens = createFakeCredentials(r, 10);
+    Path tokenPath =
+      lfs.makeQualified(new Path(ApplicationLocalizer.APPTOKEN_FILE));
+    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+        ).when(spylfs).open(tokenPath);
+    // return file stream instead of opening local file
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    final HashMap<Long,LocalResource> sizes = new HashMap<Long,LocalResource>();
+    Collection<LocalResource> resources = createFakeResources(r, 10, sizes);
+    DataInputBuffer appFiles = writeFakeAppFiles(resources);
+    Path filesPath =
+      lfs.makeQualified(new Path(ApplicationLocalizer.FILECACHE_FILE));
+    doReturn(new FSDataInputStream(new FakeFSDataInputStream(appFiles))
+        ).when(spylfs).open(filesPath);
+
+    final String user = "yak";
+    final String appId = "app_RM_0";
+    final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+    final Path logDir = new Path(basedir, "logs");
+    ApplicationLocalizer localizer = new ApplicationLocalizer(lfs, user,
+        appId, logDir, localDirs);
+    ApplicationLocalizer spyLocalizer = spy(localizer);
+    LocalizationProtocol mockLocalization = mock(LocalizationProtocol.class);
+    FSDownload mockDownload = mock(FSDownload.class);
+
+    // set to return mocks
+    doReturn(mockLocalization).when(spyLocalizer).getProxy(nmAddr);
+    for (Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
+      doReturn(new FalseDownload(rsrc.getValue(), rsrc.getKey())
+          ).when(spyLocalizer).download(Matchers.<LocalDirAllocator>anyObject(),
+            argThat(new LocalResourceMatches(rsrc.getValue())));
+    }
+    assertEquals(0, spyLocalizer.runLocalization(nmAddr));
+
+    // verify app files opened
+    verify(spylfs).open(tokenPath);
+    verify(spylfs).open(filesPath);
+    ArgumentMatcher<CharSequence> userMatch =
+      new ArgumentMatcher<CharSequence>() {
+        @Override
+        public boolean matches(Object o) {
+          return "yak".equals(o.toString());
+        }
+      };
+    for (final Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
+      ArgumentMatcher<LocalResource> localizedMatch =
+        new ArgumentMatcher<LocalResource>() {
+          @Override
+          public boolean matches(Object o) {
+            LocalResource other = (LocalResource) o;
+            r.setSeed(rsrc.getValue().timestamp);
+            boolean ret = (r.nextLong() & Long.MAX_VALUE) == other.size;
+            StringBuilder sb = new StringBuilder("/" + r.nextLong());
+            while (r.nextInt(2) == 1) {
+              sb.append("/" + r.nextLong());
+            }
+            ret &= other.resource.file.toString().equals(sb.toString());
+            ret &= other.type.equals(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+            return ret;
+          }
+        };
+      ArgumentMatcher<URL> dstMatch =
+        new ArgumentMatcher<URL>() {
+          @Override
+          public boolean matches(Object o) {
+            r.setSeed(rsrc.getValue().timestamp);
+            return ((URL)o).file.toString().equals(
+                "/done/" + (r.nextLong() & Long.MAX_VALUE));
+          }
+        };
+      verify(mockLocalization).successfulLocalization(
+          argThat(userMatch), argThat(localizedMatch), argThat(dstMatch));
+    }
+  }
+
+  static class FalseDownload extends FSDownload {
+    private final long size;
+    public FalseDownload(LocalResource resource, long size) {
+      super(null, null, null, resource, null);
+      this.size = size;
+    }
+    @Override
+    public Map<LocalResource,Path> call() {
+      LocalResource ret = getResource();
+      ret.size = size;
+      return Collections.singletonMap(ret, new Path("/done/" + size));
+    }
+  }
+
+  // sigh.
+  class LocalResourceMatches extends ArgumentMatcher<LocalResource> {
+    final LocalResource rsrc;
+    LocalResourceMatches(LocalResource rsrc) {
+      this.rsrc = rsrc;
+    }
+    @Override
+    public boolean matches(Object o) {
+      return rsrc.timestamp == ((LocalResource)o).timestamp;
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,131 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import static org.apache.hadoop.fs.CreateFlag.*;
+
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestFSDownload {
+
+  @AfterClass
+  public static void deleteTestDir() throws IOException {
+    FileContext fs = FileContext.getLocalFSFileContext();
+    fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
+  }
+
+  static LocalResource createFile(FileContext files, Path p, int len, Random r)
+      throws IOException, URISyntaxException {
+    FSDataOutputStream out = null;
+    try {
+      byte[] bytes = new byte[len];
+      out = files.create(p, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      out.write(bytes);
+    } finally {
+      if (out != null) out.close();
+    }
+    LocalResource ret = new LocalResource();
+    ret.resource = AvroUtil.getYarnUrlFromPath(p);
+    ret.size = len;
+    ret.type = LocalResourceType.FILE;
+    ret.timestamp = files.getFileStatus(p).getModificationTime();
+    return ret;
+  }
+
+  @Test
+  public void testDownload() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+      TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Collection<FSDownload> pending = new ArrayList<FSDownload>();
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+    LocalDirAllocator dirs =
+      new LocalDirAllocator(TestFSDownload.class.getName());
+    int[] sizes = new int[10];
+    for (int i = 0; i < 10; ++i) {
+      sizes[i] = rand.nextInt(512) + 512;
+      LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
+          sizes[i], rand);
+      FSDownload fsd =
+        new FSDownload(files, conf, dirs, rsrc, new Random(sharedSeed));
+      pending.add(fsd);
+    }
+
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    CompletionService<Map<LocalResource,Path>> queue =
+      new ExecutorCompletionService<Map<LocalResource,Path>>(exec);
+    try {
+      for (FSDownload fsd : pending) {
+        queue.submit(fsd);
+      }
+      Map<LocalResource,Path> results = new HashMap();
+      for (int i = 0; i < 10; ++i) {
+        Future<Map<LocalResource,Path>> result = queue.take();
+        results.putAll(result.get());
+      }
+      for (Map.Entry<LocalResource,Path> localized : results.entrySet()) {
+        assertEquals(
+            sizes[Integer.valueOf(localized.getValue().getName())],
+            localized.getKey().size - 4096 - 16); // bad DU impl + .crc ; sigh
+      }
+    } catch (ExecutionException e) {
+      throw new IOException("Failed exec", e);
+    } finally {
+      exec.shutdown();
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLocalResource {
+
+  static org.apache.hadoop.yarn.LocalResource getYarnResource(Path p, long size,
+      long timestamp, LocalResourceType type, LocalResourceVisibility state)
+      throws URISyntaxException {
+    org.apache.hadoop.yarn.LocalResource ret = new org.apache.hadoop.yarn.LocalResource();
+    ret.resource = AvroUtil.getYarnUrlFromURI(p.toUri());
+    ret.size = size;
+    ret.timestamp = timestamp;
+    ret.type = type;
+    ret.state = state;
+    return ret;
+  }
+
+  static void checkEqual(LocalResource a, LocalResource b) {
+    assertEquals(a, b);
+    assertEquals(a.hashCode(), b.hashCode());
+    assertEquals(0, a.compareTo(b));
+    assertEquals(0, b.compareTo(a));
+  }
+
+  static void checkNotEqual(LocalResource a, LocalResource b) {
+    assertFalse(a.equals(b));
+    assertFalse(b.equals(a));
+    assertFalse(a.hashCode() == b.hashCode());
+    assertFalse(0 == a.compareTo(b));
+    assertFalse(0 == b.compareTo(a));
+  }
+
+  @Test
+  public void testResourceEquality() throws URISyntaxException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+
+    long basetime = r.nextLong() >>> 2;
+    org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+    org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+    final LocalResource a = new LocalResource(yA);
+    LocalResource b = new LocalResource(yA);
+    checkEqual(a, b);
+    b = new LocalResource(yB);
+    checkEqual(a, b);
+
+    // ignore visibility
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE);
+    b = new LocalResource(yB);
+    checkEqual(a, b);
+
+    // ignore size
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE);
+    b = new LocalResource(yB);
+    checkEqual(a, b);
+
+    // note path
+    yB = getYarnResource(
+        new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
+    b = new LocalResource(yB);
+    checkNotEqual(a, b);
+
+    // note type
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
+    b = new LocalResource(yB);
+    checkNotEqual(a, b);
+
+    // note timestamp
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC);
+    b = new LocalResource(yB);
+    checkNotEqual(a, b);
+  }
+
+  @Test
+  public void testResourceOrder() throws URISyntaxException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    long basetime = r.nextLong() >>> 2;
+    org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+    final LocalResource a = new LocalResource(yA);
+
+    // Path primary
+    org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+        new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
+    LocalResource b = new LocalResource(yB);
+    assertTrue(0 > a.compareTo(b));
+
+    // timestamp secondary
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC);
+    b = new LocalResource(yB);
+    assertTrue(0 > a.compareTo(b));
+
+    // type tertiary
+    yB = getYarnResource(
+        new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC);
+    b = new LocalResource(yB);
+    assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,220 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.*;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.AppLocalizationRunnerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Mockito.*;
+
+public class TestLocalResources {
+
+  private static List<org.apache.hadoop.yarn.LocalResource>
+      randResources(Random r, int nRsrc) throws URISyntaxException {
+    final List<org.apache.hadoop.yarn.LocalResource> ret =
+      new ArrayList<org.apache.hadoop.yarn.LocalResource>(nRsrc);
+    Path base = new Path("file:///foo/bar");
+    long basetime = r.nextLong() >>> 2;
+    for (int i = 0; i < nRsrc; ++i) {
+      org.apache.hadoop.yarn.LocalResource rsrc = new org.apache.hadoop.yarn.LocalResource();
+      rsrc.timestamp = basetime + i;
+      r.setSeed(rsrc.timestamp);
+      Path p = new Path(base, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
+      while (r.nextInt(2) == 1) {
+        p = new Path(p, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
+      }
+      rsrc.resource = AvroUtil.getYarnUrlFromPath(p);
+      rsrc.size = -1;
+      rsrc.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
+      rsrc.state = PRIVATE;
+
+      System.out.println("RSRC: " + rsrc);
+      ret.add(rsrc);
+    }
+    return ret;
+  }
+
+  private static void verify(
+      BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue,
+      Collection<org.apache.hadoop.yarn.LocalResource> files)
+      throws ExecutionException, InterruptedException, URISyntaxException {
+    for (Future<Map<LocalResource,Path>> f = doneQueue.poll(); f != null;
+         f = doneQueue.poll()) {
+      Map<LocalResource,Path> q = f.get();
+      assertEquals(1, q.size());
+      for (Map.Entry<LocalResource,Path> loc : q.entrySet()) {
+        boolean found = false;
+        for (org.apache.hadoop.yarn.LocalResource yrsrc : files) {
+          LocalResource rsrc = new LocalResource(yrsrc);
+          found |= rsrc.equals(loc.getKey());
+        }
+        assertTrue(found);
+        assertEquals(new Path("file:///yak/" + loc.getKey().getTimestamp()),
+                     loc.getValue());
+      }
+    }
+  }
+
+  private static AppLocalizationRunnerImpl getMockAppLoc(
+    final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue,
+    String name) {
+    AppLocalizationRunnerImpl mockAppLoc = mock(AppLocalizationRunnerImpl.class);
+    doAnswer(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocation) {
+          doneQueue.offer(
+            (Future<Map<LocalResource,Path>>)invocation.getArguments()[0]);
+          return null;
+        }
+      }).when(mockAppLoc).localizedResource(
+        Matchers.<Future<Map<LocalResource,Path>>>anyObject());
+    when(mockAppLoc.toString()).thenReturn(name);
+    return mockAppLoc;
+  }
+
+  static void successfulLoc(LocalResourcesTrackerImpl rsrcMap,
+      List<org.apache.hadoop.yarn.LocalResource> rsrc)
+      throws InterruptedException, URISyntaxException {
+    long i = 0;
+    for (org.apache.hadoop.yarn.LocalResource yRsrc : rsrc) {
+      yRsrc.size = ++i;
+      rsrcMap.setSuccess(new LocalResource(yRsrc), yRsrc.size,
+          new Path("file:///yak/" + yRsrc.timestamp));
+    }
+  }
+
+  static void failedLoc(
+      BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue) {
+    try {
+      Future<Map<LocalResource,Path>> f = doneQueue.poll();
+      f.get();
+      fail("Failed localization succeeded");
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail("Wrong exception");
+    } catch (ExecutionException e) {
+      /* expected */
+    }
+  }
+
+  @Test
+  public void testLocalResourceAcquire() throws Exception {
+    final int NUM_EXP = 4;
+    final int NUM_URIS = 1 << NUM_EXP;
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println("SEED: " + seed);
+    // shared resource map
+    LocalResourcesTrackerImpl rsrcMap = new LocalResourcesTrackerImpl();
+    List<org.apache.hadoop.yarn.LocalResource> resourcesA = randResources(r, NUM_URIS);
+
+    // set up application A mocks
+    final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueueA =
+      new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>();
+    AppLocalizationRunnerImpl mockAppLocA = getMockAppLoc(doneQueueA, "A");
+
+    // set up application B mocks
+    final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueueB =
+      new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>();
+    AppLocalizationRunnerImpl mockAppLocB = getMockAppLoc(doneQueueB, "B");
+
+    Collection<org.apache.hadoop.yarn.LocalResource> todoA =
+      rsrcMap.register(mockAppLocA, resourcesA);
+    // ensure no rsrc added until reported back
+    assertEquals(NUM_URIS, todoA.size());
+    assertTrue(doneQueueA.isEmpty());
+
+    // complete half A localization
+    successfulLoc(rsrcMap, resourcesA.subList(0, NUM_URIS >>> 1));
+
+    // verify callback
+    assertEquals(NUM_URIS >>> 1, doneQueueA.size());
+    verify(doneQueueA, todoA);
+
+    // set up B localization as 1/4 localized A rsrc, 1/4 non-localized A rsrc,
+    // 1/2 new rsrc
+    long seed2 = r.nextLong();
+    r.setSeed(seed2);
+    System.out.println("SEED: " + seed2);
+    List<org.apache.hadoop.yarn.LocalResource> resourcesB =
+      randResources(r, NUM_URIS >>> 1);
+    resourcesB.addAll(resourcesA.subList(NUM_URIS >>> 2,
+                                         NUM_URIS - (NUM_URIS >>> 2)));
+    Collection<org.apache.hadoop.yarn.LocalResource> todoB =
+      rsrcMap.register(mockAppLocB, resourcesB);
+    // all completed A rsrc
+    assertEquals(NUM_URIS >>> 2, doneQueueB.size());
+    // only uniq rsrc, not in A
+    assertEquals(NUM_URIS >>> 1, todoB.size());
+    // verify completed rsrc in B done queue completed by A
+    verify(doneQueueB, todoA);
+
+    // complete A localization, less 1 shared rsrc
+    successfulLoc(rsrcMap, resourcesA.subList((NUM_URIS >>> 1) + 1, NUM_URIS));
+    // completed A rsrc in B
+    assertEquals((NUM_URIS >>> 2) - 1, doneQueueB.size());
+    verify(doneQueueB, todoA);
+    assertEquals((NUM_URIS >>> 1) - 1, doneQueueA.size());
+    verify(doneQueueA, todoA);
+
+    // fail shared localization
+    rsrcMap.removeFailedResource(new LocalResource(resourcesA.get(NUM_URIS >>> 1)),
+        RPCUtil.getRemoteException(new IOException("FAIL!")));
+    assertEquals(1, doneQueueA.size());
+    assertEquals(1, doneQueueB.size());
+    failedLoc(doneQueueA);
+    failedLoc(doneQueueB);
+
+    // verify cleared
+    Collection<org.apache.hadoop.yarn.LocalResource> todoA2 =
+      rsrcMap.register(mockAppLocA, Collections.singletonList(
+            resourcesA.get(NUM_URIS >>> 1)));
+    assertEquals(1, todoA2.size());
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<project>
+  <parent>
+    <artifactId>yarn-server</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>yarn-server-resourcemanager</artifactId>
+  <name>yarn-server-resourcemanager</name>
+  <version>${yarn.version}</version>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <!-- begin MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-api</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <!-- end MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,189 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationMasterHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+@Private
+public class ApplicationMasterService extends AbstractService implements 
+AMRMProtocol, EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+  private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
+  private ApplicationMasterHandler applicationsManager;
+  private YarnScheduler rScheduler;
+  private ApplicationTokenSecretManager appTokenManager;
+  private InetSocketAddress masterServiceAddress;
+  private Server server;
+  private Map<ApplicationID, AMResponse> responseMap = 
+    new HashMap<ApplicationID, AMResponse>();
+  private final AMResponse reboot = new AMResponse();
+  private final ASMContext asmContext;
+  
+  public ApplicationMasterService(ApplicationTokenSecretManager appTokenManager,
+      ApplicationMasterHandler applicationsManager, YarnScheduler scheduler, ASMContext asmContext) {
+    super(ApplicationMasterService.class.getName());
+    this.appTokenManager = appTokenManager;
+    this.applicationsManager = applicationsManager;
+    this.rScheduler = scheduler;
+    this.reboot.reboot = true;
+    this.reboot.containers = new ArrayList<Container>();
+    this.asmContext = asmContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    String bindAddress =
+      conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+    masterServiceAddress =  NetUtils.createSocketAddr(bindAddress);
+    this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, this);
+    super.init(conf);
+  }
+
+  public void start() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration serverConf = new Configuration(getConfig());
+    serverConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        SchedulerSecurityInfo.class, SecurityInfo.class);
+    this.server =
+      rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
+          serverConf, this.appTokenManager);
+    this.server.start();
+    super.start();
+  }
+  @Override
+  public Void registerApplicationMaster(ApplicationMaster applicationMaster)
+  throws AvroRemoteException {
+    try {
+      applicationsManager.registerApplicationMaster(applicationMaster);
+    } catch(IOException ie) {
+      LOG.info("Exception registering application ", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+    return null;
+  }
+
+  @Override
+  public  Void finishApplicationMaster(ApplicationMaster applicationMaster)
+  throws AvroRemoteException {
+    try {
+      applicationsManager.finishApplicationMaster(applicationMaster);
+    } catch(IOException ie) {
+      LOG.info("Exception finishing application", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+    return null;
+  }
+
+  @Override
+  public AMResponse allocate(ApplicationStatus status,
+      List<ResourceRequest> ask, List<Container> release)
+  throws AvroRemoteException {
+    try {
+      /* check if its in cache */
+      synchronized(responseMap) {
+        AMResponse lastResponse = responseMap.get(status.applicationId);
+        if (lastResponse == null) {
+          LOG.error("Application doesnt exist in cache " + status.applicationId);
+          return reboot;
+        }
+        if ((status.responseID + 1) == lastResponse.responseId) {
+          /* old heartbeat */
+          return lastResponse;
+        } else if (status.responseID + 1 < lastResponse.responseId) {
+          LOG.error("Invalid responseid from application " + status.applicationId);
+          return reboot;
+        }
+        applicationsManager.applicationHeartbeat(status);
+        List<Container> containers = rScheduler.allocate(status.applicationId, ask, release);
+        AMResponse  response = new AMResponse();
+        response.containers = containers;
+        response.responseId = lastResponse.responseId + 1;
+        responseMap.put(status.applicationId, response);
+        return response;
+      }
+    } catch(IOException ie) {
+      LOG.info("Error in allocation for " + status.applicationId, ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+  } 
+
+  @Override
+  public void stop() {
+    if (this.server != null) {
+      this.server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(ASMEvent<ApplicationTrackerEventType> appEvent) {
+    ApplicationTrackerEventType event = appEvent.getType();
+    ApplicationID id = appEvent.getAppContext().getApplicationID();
+    synchronized(responseMap) {
+      switch (event) {
+      case ADD:
+        AMResponse response = new AMResponse();
+        response.containers = null;
+        response.responseId = 0;
+        responseMap.put(id, response);
+        break;
+      case REMOVE:
+        responseMap.remove(id);
+        break;
+      default: 
+        break;
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,137 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTracker;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+/**
+ * The client interface to the Resource Manager. This module handles all the rpc
+ * interfaces to the resource manager from the client.
+ */
+public class ClientRMService extends AbstractService implements ClientRMProtocol {
+  private static final Log LOG = LogFactory.getLog(ClientRMService.class);
+  private RMResourceTracker clusterInfo;
+  private ApplicationsManager applicationsManager;
+  private String clientServiceBindAddress;
+  private Server server;
+  InetSocketAddress clientBindAddress;
+  
+  public ClientRMService(ApplicationsManager applicationsManager, 
+        RMResourceTracker clusterInfo) {
+    super(ClientRMService.class.getName());
+    this.clusterInfo = clusterInfo;
+    this.applicationsManager = applicationsManager;
+  }
+  
+  @Override
+  public void init(Configuration conf) {
+    clientServiceBindAddress =
+      conf.get(YarnConfiguration.APPSMANAGER_ADDRESS,
+          YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
+    clientBindAddress =
+      NetUtils.createSocketAddr(clientServiceBindAddress);
+    super.init(conf);
+  }
+  
+  @Override
+  public void start() {
+    // All the clients to appsManager are supposed to be authenticated via
+    // Kerberos if security is enabled, so no secretManager.
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration clientServerConf = new Configuration(getConfig());
+    clientServerConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ClientRMSecurityInfo.class, SecurityInfo.class);
+    this.server =   
+      rpc.getServer(ClientRMProtocol.class, this,
+            clientBindAddress,
+            clientServerConf, null);
+    this.server.start();
+    super.start();
+  }
+
+  @Override
+  public ApplicationID getNewApplicationId() throws AvroRemoteException {
+    return applicationsManager.getNewApplicationID();
+  }
+
+  @Override
+  public ApplicationMaster getApplicationMaster(ApplicationID applicationId)
+      throws AvroRemoteException {
+    return applicationsManager.getApplicationMaster(applicationId);
+  }
+
+  @Override
+  public Void submitApplication(ApplicationSubmissionContext context)
+      throws AvroRemoteException {
+    try {
+      applicationsManager.submitApplication(context);
+    } catch (IOException ie) {
+      LOG.info("Exception in submitting application", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+    return null;
+  }
+
+  @Override
+  public Void finishApplication(ApplicationID applicationId)
+      throws AvroRemoteException {
+    try {
+      applicationsManager.finishApplication(applicationId);
+    } catch(IOException ie) {
+      LOG.info("Error finishing application ", ie);
+    }
+    return null;
+  }
+
+  @Override
+  public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException {
+    return clusterInfo.getClusterMetrics();
+  }
+  
+  @Override
+  public void stop() {
+    if (this.server != null) {
+        this.server.close();
+    }
+    super.stop();
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class RMConfig {
+  public static final String RM_KEYTAB = YarnConfiguration.RM_PREFIX
+      + "keytab";
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,227 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+
+import java.io.IOException;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.SyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+
+/**
+ * The ResourceManager is the main class that is a set of components.
+ *
+ */
+public class ResourceManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(ResourceManager.class);
+  public static final long clusterTimeStamp = System.currentTimeMillis();
+  private YarnConfiguration conf;
+  
+  private ApplicationsManagerImpl applicationsManager;
+  
+  private ContainerTokenSecretManager containerTokenSecretManager =
+      new ContainerTokenSecretManager();
+
+  private ApplicationTokenSecretManager appTokenSecretManager =
+      new ApplicationTokenSecretManager();
+
+  
+  private ResourceScheduler scheduler;
+  private RMResourceTrackerImpl rmResourceTracker;
+  private ClientRMService clientRM;
+  private ApplicationMasterService masterService;
+  private Boolean shutdown = false;
+  private WebApp webApp;
+  private final ASMContext asmContext;
+  
+  public ResourceManager() {
+    super("ResourceManager");
+    this.asmContext = new ASMContextImpl();
+  }
+  
+  
+  public interface ASMContext {
+    public SyncDispatcher getDispatcher();
+  }
+  
+  public static class ASMContextImpl implements ASMContext {
+    private final SyncDispatcher asmEventDispatcher;
+   
+    public ASMContextImpl() {
+      this.asmEventDispatcher = new SyncDispatcher();
+    }
+    
+    @Override
+    public SyncDispatcher getDispatcher() {
+      return this.asmEventDispatcher;
+    }
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    // Initialize the config
+    this.conf = new YarnConfiguration(conf);
+    // Initialize the scheduler
+    this.scheduler = 
+      ReflectionUtils.newInstance(
+          conf.getClass(YarnConfiguration.RESOURCE_SCHEDULER, 
+              FifoScheduler.class, ResourceScheduler.class), 
+          this.conf);
+    this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager);
+    
+    //TODO change this to be random
+    this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
+        .createSecretKey("Dummy".getBytes()));
+
+    applicationsManager = createApplicationsManagerImpl();
+    addService(applicationsManager);
+    
+    rmResourceTracker = createRMResourceTracker();
+    rmResourceTracker.register(this.scheduler);
+    addService(rmResourceTracker);
+    
+    clientRM = createClientRMService();
+    addService(clientRM);
+    
+    masterService = createApplicationMasterService();
+    addService(masterService) ;
+    super.init(conf);
+  }
+  
+  @Override
+  public void start() { 
+
+    try {
+      doSecureLogin();
+    } catch(IOException ie) {
+      throw new AvroRuntimeException("Failed to login", ie);
+    }
+    super.start();
+
+    webApp = WebApps.$for("yarn", masterService).at(
+      conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_WEBAPP_BIND_ADDRESS)).
+    start(new RMWebApp(this));
+  }
+
+  public void run() {
+    synchronized(shutdown) {
+      try {
+        shutdown.wait();
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted while waiting", ie);
+      }
+    }
+  }
+  
+  protected void doSecureLogin() throws IOException {
+    SecurityUtil.login(conf, RMConfig.RM_KEYTAB,
+        YarnConfiguration.RM_SERVER_PRINCIPAL_KEY);
+  }
+
+  @Override
+  public void stop() {
+    if (webApp != null) {
+      webApp.stop();
+    }
+  
+    synchronized(shutdown) {
+      shutdown = true;
+      shutdown.notify();
+    }
+    super.stop();
+  }
+  
+  protected RMResourceTrackerImpl createRMResourceTracker() {
+    return new RMResourceTrackerImpl(this.containerTokenSecretManager);
+  }
+  
+  protected ApplicationsManagerImpl createApplicationsManagerImpl() {
+    return new ApplicationsManagerImpl(
+        this.appTokenSecretManager, this.scheduler, this.asmContext);
+  }
+
+  protected ClientRMService createClientRMService() {
+    return new ClientRMService(applicationsManager, rmResourceTracker);
+  }
+
+  protected ApplicationMasterService createApplicationMasterService() {
+    return new ApplicationMasterService(
+      this.appTokenSecretManager, applicationsManager, scheduler, this.asmContext);
+  }
+  
+  /**
+   * return applications manager.
+   * @return
+   */
+  public ApplicationsManager getApplicationsManager() {
+    return applicationsManager;
+  }
+  
+  /**
+   * return the scheduler.
+   * @return
+   */
+  public ResourceScheduler getResourceScheduler() {
+    return this.scheduler;
+  }
+  
+  /**
+   * return the resource tracking component.
+   * @return
+   */
+  public RMResourceTrackerImpl getResourceTracker() {
+    return this.rmResourceTracker;
+  }
+  
+  
+  public static void main(String argv[]) {
+    ResourceManager resourceManager = null;
+    try {
+      Configuration conf = new YarnConfiguration();
+      resourceManager = new ResourceManager();
+      resourceManager.init(conf);
+      resourceManager.start();
+      resourceManager.run();
+    } catch (Exception e) {
+      LOG.error("Error starting RM", e);
+    } finally {
+      if (resourceManager != null) {
+        resourceManager.stop();
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,261 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+
+public class AMLauncher implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(AMLauncher.class);
+
+  private ContainerManager containerMgrProxy;
+
+  private final AppContext master;
+  private final Configuration conf;
+  private ApplicationTokenSecretManager applicationTokenSecretManager;
+  private ClientToAMSecretManager clientToAMSecretManager;
+  private AMLauncherEventType event;
+  
+  @SuppressWarnings("rawtypes")
+  private EventHandler handler;
+  
+  @SuppressWarnings("unchecked")
+  public AMLauncher(ASMContext asmContext, AppContext master,
+      AMLauncherEventType event,ApplicationTokenSecretManager applicationTokenSecretManager,
+      ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
+    this.master = master;
+    this.conf = new Configuration(conf); // Just not to touch the sec-info class
+    this.applicationTokenSecretManager = applicationTokenSecretManager;
+    this.clientToAMSecretManager = clientToAMSecretManager;
+    this.conf.setClass(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ContainerManagerSecurityInfo.class, SecurityInfo.class);
+    this.event = event;
+    this.handler = asmContext.getDispatcher().getEventHandler();
+  }
+  
+  private void connect() throws IOException {
+    ContainerID masterContainerID = master.getMasterContainer().id;
+    containerMgrProxy =
+        getContainerMgrProxy(masterContainerID.appID);
+  }
+  
+  private void launch() throws IOException {
+    connect();
+    ContainerID masterContainerID = master.getMasterContainer().id;
+    ApplicationSubmissionContext applicationContext =
+      master.getSubmissionContext();
+    LOG.info("Setting up container " + master.getMasterContainer() 
+        + " for AM " + master.getMaster());  
+    ContainerLaunchContext launchContext =
+        getLaunchSpec(applicationContext, masterContainerID);
+    containerMgrProxy.startContainer(launchContext);
+    LOG.info("Done launching container " + master.getMasterContainer() 
+        + " for AM " + master.getMaster());
+  }
+  
+  private void cleanup() throws IOException {
+    connect();
+    ContainerID containerId = master.getMasterContainer().id;
+    containerMgrProxy.stopContainer(containerId);
+    containerMgrProxy.cleanupContainer(containerId);
+  }
+
+  private ContainerManager getContainerMgrProxy(
+      final ApplicationID applicationID) throws IOException {
+
+    Container container = master.getMasterContainer();
+
+    final String containerManagerBindAddress = container.hostName.toString();
+
+    final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
+
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser("TODO"); // TODO
+    if (UserGroupInformation.isSecurityEnabled()) {
+      ContainerToken containerToken = container.containerToken;
+      Token<ContainerTokenIdentifier> token =
+          new Token<ContainerTokenIdentifier>(
+              containerToken.identifier.array(),
+              containerToken.password.array(), new Text(
+                  containerToken.kind.toString()), new Text(
+                  containerToken.service.toString()));
+      currentUser.addToken(token);
+    }
+    return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
+      @Override
+      public ContainerManager run() {
+        return (ContainerManager) rpc.getProxy(ContainerManager.class,
+            NetUtils.createSocketAddr(containerManagerBindAddress), conf);
+      }
+    });
+  }
+
+  private ContainerLaunchContext getLaunchSpec(
+      ApplicationSubmissionContext applicationMasterContext,
+      ContainerID containerID) throws IOException {
+
+    // Construct the actual Container
+    ContainerLaunchContext container = new ContainerLaunchContext();
+    container.command = applicationMasterContext.command;
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : container.command) {
+      mergedCommand.append(str).append(" ");
+    }
+    LOG.info("Command to launch container " + 
+        containerID + " : " + mergedCommand);
+    container.env = applicationMasterContext.environment;
+
+    container.env.putAll(setupTokensInEnv(applicationMasterContext));
+
+    // Construct the actual Container
+    container.id = containerID;
+    container.user = applicationMasterContext.user;
+    container.resource = applicationMasterContext.masterCapability;
+    container.resources = applicationMasterContext.resources_todo;
+    container.containerTokens = applicationMasterContext.fsTokens_todo;
+    return container;
+  }
+
+  private Map<CharSequence, CharSequence> setupTokensInEnv(
+      ApplicationSubmissionContext asc)
+      throws IOException {
+    Map<CharSequence, CharSequence> env =
+      new HashMap<CharSequence, CharSequence>();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // TODO: Security enabled/disabled info should come from RM.
+
+      Credentials credentials = new Credentials();
+
+      DataInputByteBuffer dibb = new DataInputByteBuffer();
+      if (asc.fsTokens_todo != null) {
+        // TODO: Don't do this kind of checks everywhere.
+        dibb.reset(asc.fsTokens_todo);
+        credentials.readTokenStorageStream(dibb);
+      }
+
+      ApplicationTokenIdentifier id =
+          new ApplicationTokenIdentifier(master.getMasterContainer().id.appID);
+      Token<ApplicationTokenIdentifier> token =
+          new Token<ApplicationTokenIdentifier>(id,
+              this.applicationTokenSecretManager);
+      String schedulerAddressStr =
+          this.conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+              YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+      InetSocketAddress unresolvedAddr =
+          NetUtils.createSocketAddr(schedulerAddressStr);
+      String resolvedAddr =
+          unresolvedAddr.getAddress().getHostAddress() + ":"
+              + unresolvedAddr.getPort();
+      token.setService(new Text(resolvedAddr));
+      String appMasterTokenEncoded = token.encodeToUrlString();
+      LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
+      env.put(YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME,
+          appMasterTokenEncoded);
+
+      // Add the RM token
+      credentials.addToken(new Text(resolvedAddr), token);
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      asc.fsTokens_todo = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+      ApplicationTokenIdentifier identifier =
+          new ApplicationTokenIdentifier(
+              this.master.getMaster().applicationId);
+      SecretKey clientSecretKey =
+          this.clientToAMSecretManager.getMasterKey(identifier);
+      String encoded =
+          Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
+      LOG.debug("The encoded client secret-key to be put in env : " + encoded);
+      env.put(YarnConfiguration.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded);
+    }
+    return env;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void run() {
+    switch (event) {
+    case LAUNCH:
+      try {
+        LOG.info("Launching master" + master.getMaster());
+        launch();
+        } catch(IOException ie) {
+        LOG.info("Error launching ", ie);
+        handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FAILED, master));
+      }
+      handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,  master));
+      break;
+    case CLEANUP:
+      try {
+        LOG.info("Cleaning master " + master.getMaster());
+        cleanup();
+      } catch(IOException ie) {
+        LOG.info("Error cleaning master ", ie);
+      }
+      handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, master));
+      break;
+    default:
+      break;
+    }
+  }
+
+  public AppContext getApplicationContext() {
+   return master;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,351 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * This class tracks the application masters that are running. It tracks
+ * heartbeats from application master to see if it needs to expire some application
+ * master.
+ */
+@Evolving
+@Private
+public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
+    <ApplicationEventType>> {
+  private static final Log LOG = LogFactory.getLog(AMTracker.class);
+  private HeartBeatThread heartBeatThread;
+  private long amExpiryInterval; 
+  @SuppressWarnings("rawtypes")
+  private EventHandler handler;
+
+  private final ASMContext asmContext;
+  
+  private final Map<ApplicationID, ApplicationMasterInfo> applications = 
+    new ConcurrentHashMap<ApplicationID, ApplicationMasterInfo>();
+
+  private TreeSet<ApplicationStatus> amExpiryQueue =
+    new TreeSet<ApplicationStatus>(
+        new Comparator<ApplicationStatus>() {
+          public int compare(ApplicationStatus p1, ApplicationStatus p2) {
+            if (p1.lastSeen < p2.lastSeen) {
+              return -1;
+            } else if (p1.lastSeen > p2.lastSeen) {
+              return 1;
+            } else {
+              return (p1.applicationId.id -
+                  p2.applicationId.id);
+            }
+          }
+        }
+    );
+  
+  public AMTracker(ASMContext asmContext) {
+    super(AMTracker.class.getName());
+    this.heartBeatThread = new HeartBeatThread();
+    this.asmContext = asmContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    this.handler = asmContext.getDispatcher().getEventHandler();
+    this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 
+    YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
+    this.asmContext.getDispatcher().register(ApplicationEventType.class, this);
+  }
+
+  @Override
+  public void start() {   
+    super.start();
+    heartBeatThread.start();
+  }
+
+  /**
+   * This class runs continuosly to track the application masters
+   * that might be dead.
+   */
+  private class HeartBeatThread extends Thread {
+    private volatile boolean stop = false;
+
+    public HeartBeatThread() {
+      super("ApplicationsManager:" + HeartBeatThread.class.getName());
+    }
+
+    @Override
+    public void run() {
+      /* the expiry queue does not need to be in sync with applications,
+       * if an applications in the expiry queue cannot be found in applications
+       * its alright. We do not want to hold a hold on applications while going
+       * through the expiry queue.
+       */
+      List<ApplicationID> expired = new ArrayList<ApplicationID>();
+      while (!stop) {
+        ApplicationStatus leastRecent;
+        long now = System.currentTimeMillis();
+        expired.clear();
+        synchronized(amExpiryQueue) {
+          while ((amExpiryQueue.size() > 0) &&
+              (leastRecent = amExpiryQueue.first()) != null &&
+              ((now - leastRecent.lastSeen) > 
+              amExpiryInterval)) {
+            amExpiryQueue.remove(leastRecent);
+            ApplicationMasterInfo info;
+            synchronized(applications) {
+              info = applications.get(leastRecent.applicationId);
+            }
+            if (info == null) {
+              continue;
+            }
+            ApplicationStatus status = info.getStatus();
+            if ((now - status.lastSeen) > amExpiryInterval) {
+              expired.add(status.applicationId);
+            } else {
+              amExpiryQueue.add(status);
+            }
+          }
+        }
+        expireAMs(expired);
+      }
+    }
+
+    public void shutdown() {
+      stop = true;
+    }
+  }
+
+  protected void expireAMs(List<ApplicationID> toExpire) {
+    for (ApplicationID app: toExpire) {
+      ApplicationMasterInfo am = null;
+      synchronized (applications) {
+        am = applications.get(app);
+      }
+     
+      handler.handle(new ASMEvent<ApplicationEventType>
+          (ApplicationEventType.EXPIRE, am));
+      }
+    }
+
+  @Override
+  public void stop() {
+    heartBeatThread.interrupt();
+    heartBeatThread.shutdown();
+    try {
+      heartBeatThread.join(1000);
+    } catch (InterruptedException ie) {
+      LOG.info(heartBeatThread.getName() + " interrupted during join ", 
+          ie);    }
+    super.stop();
+  }
+
+  public void addMaster(String user,  ApplicationSubmissionContext 
+      submissionContext, String clientToken) {
+    ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(handler, 
+      user, submissionContext, clientToken);
+    synchronized(applications) {
+      applications.put(applicationMaster.getApplicationID(), applicationMaster);
+    }
+    /* initiate the launching cycle for the AM */
+    handler.handle(new ASMEvent<ApplicationEventType>(
+        ApplicationEventType.ALLOCATE, applicationMaster));
+  }
+  
+  public void finish(ApplicationID application) {
+    ApplicationMasterInfo masterInfo = null;
+    synchronized(applications) {
+      masterInfo = applications.get(application);
+    }
+    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH,
+        masterInfo));
+  }
+
+  public ApplicationMasterInfo get(ApplicationID applicationId) {
+    ApplicationMasterInfo masterInfo = null;
+    synchronized (applications) {
+      masterInfo = applications.get(applicationId);
+    }
+    return masterInfo;
+  }
+
+  public void remove(ApplicationID applicationId) {
+    synchronized (applications) {
+      applications.remove(applicationId);
+    }
+  }
+
+  public synchronized List<AppContext> getAllApplications() {
+    List<AppContext> allAMs = new ArrayList<AppContext>();
+    synchronized (applications) {
+      for ( ApplicationMasterInfo val: applications.values()) {
+        allAMs.add(val);
+      }
+    }
+    return allAMs;
+  } 
+
+  private void addForTracking(AppContext master) {
+    LOG.info("Adding application master for tracking " + master.getMaster());
+    synchronized (amExpiryQueue) {
+      amExpiryQueue.add(master.getStatus());
+    }
+  }
+
+  public void kill(ApplicationID applicationID) {
+    ApplicationMasterInfo masterInfo = null;
+    
+    synchronized(applications) {
+      masterInfo = applications.get(applicationID);
+    }
+    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.KILL, 
+    masterInfo));
+  }
+  
+  /*
+   * this class is used for passing status context to the application state
+   * machine.
+   */
+  private static class TrackerAppContext implements AppContext {
+    private final ApplicationID appID;
+    private final ApplicationMaster master;
+    private final UnsupportedOperationException notimplemented;
+  
+    public TrackerAppContext(
+         ApplicationID appId, ApplicationMaster master) {
+      this.appID = appId;
+      this.master = master;
+      this.notimplemented = new NotImplementedException();
+    }
+    
+    @Override
+    public ApplicationSubmissionContext getSubmissionContext() {
+      throw notimplemented;
+    }
+    @Override
+    public Resource getResource() {
+      throw notimplemented;
+    }
+    @Override
+    public ApplicationID getApplicationID() {
+      return appID;
+    }
+    @Override
+    public ApplicationStatus getStatus() {
+      return master.status;
+    }
+    @Override
+    public ApplicationMaster getMaster() {
+      return master;
+    }
+    @Override
+    public Container getMasterContainer() {
+      throw notimplemented;
+    }
+    @Override
+    public String getUser() {   
+      throw notimplemented;
+    }
+    @Override
+    public long getLastSeen() {
+      return master.status.lastSeen;
+    }
+    @Override
+    public String getName() {
+     throw notimplemented;
+    }
+    @Override
+    public String getQueue() {
+      throw notimplemented;
+    }
+  }
+  
+  public void heartBeat(ApplicationStatus status) {
+    ApplicationMaster master = new ApplicationMaster();
+    master.status = status;
+    master.applicationId = status.applicationId;
+    TrackerAppContext context = new TrackerAppContext(status.applicationId, master);
+    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.STATUSUPDATE, 
+        context));
+  }
+  
+  public void registerMaster(ApplicationMaster applicationMaster) {
+    applicationMaster.status.lastSeen = System.currentTimeMillis();
+    ApplicationMasterInfo master = null;
+    synchronized(applications) {
+      master = applications.get(applicationMaster.applicationId);
+    }
+    LOG.info("AM registration " + master.getMaster());
+    TrackerAppContext registrationContext = new TrackerAppContext(
+        master.getApplicationID(), applicationMaster);
+    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+      REGISTERED,  registrationContext));
+  }
+  
+  @Override
+  public void handle(ASMEvent<ApplicationEventType> event) {
+    ApplicationID appID = event.getAppContext().getApplicationID();
+    ApplicationMasterInfo masterInfo = null;
+    synchronized(applications) {
+      masterInfo = applications.get(appID);
+    }
+    try {
+      masterInfo.handle(event);
+    } catch(Throwable t) {
+      LOG.error("Error in handling event type " + event.getType() + " for application " 
+          + event.getAppContext().getApplicationID());
+    }
+    /* we need to launch the applicaiton master on allocated transition */
+    if (masterInfo.getState() == ApplicationState.ALLOCATED) {
+      handler.handle(new ASMEvent<ApplicationEventType>(
+        ApplicationEventType.LAUNCH, masterInfo));
+    }
+    if (masterInfo.getState() == ApplicationState.LAUNCHED) {
+      /* the application move to a launched state start tracking */
+      synchronized (amExpiryQueue) {
+        LOG.info("DEBUG -- adding to  expiry " + masterInfo.getStatus() + 
+        " currenttime " + System.currentTimeMillis());
+        amExpiryQueue.add(masterInfo.getStatus());
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Resource;
+
+/** 
+ * The context of an application. 
+ *
+ */
+public interface AppContext {
+  
+  /**
+   * the application submission context for this application.
+   * @return the {@link ApplicationSubmissionContext} for the submitted
+   * application.
+   */
+  public ApplicationSubmissionContext getSubmissionContext();
+  
+  /**
+   *  get the resource required for the application master.
+   * @return the resource requirements of the application master
+   */
+  public Resource getResource();
+  
+  /**
+   * get the application ID for this application
+   * @return the application id for this application
+   */
+  public ApplicationID getApplicationID(); 
+  
+  /**
+   * get the status of the application
+   * @return the {@link ApplicationStatus} of this application
+   */
+  public ApplicationStatus getStatus();
+  
+  /**
+   * the application master for this application.
+   * @return the {@link ApplicationMaster} for this application
+   */
+  public ApplicationMaster getMaster();
+  
+  /**
+   * the container on which the application master is running.
+   * @return the container for running the application master.
+   */
+  public Container getMasterContainer();
+  
+  /**
+   * the user for this application
+   * @return the user for this application
+   */
+  public String getUser();
+  
+  /**
+   * The last time the RM heard from this application
+   * @return the last time RM heard from this application.
+   */
+  public long getLastSeen();  
+  
+  /**
+   * the name for this application
+   * @return the application name.
+   */
+  public String getName();
+  
+  /**
+   * The queue of this application.
+   * @return the queue for this application
+   */
+  public String getQueue();
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+
+/**
+ * Interface the application master use for application master status
+ * and registeration.
+ */
+@Private
+@Evolving
+public interface ApplicationMasterHandler  {
+  void registerApplicationMaster(ApplicationMaster applicationMaster)
+  throws IOException ;
+
+  void applicationHeartbeat(ApplicationStatus status) throws IOException;
+
+  void finishApplicationMaster(ApplicationMaster applicationMaster)
+  throws IOException;
+}
\ No newline at end of file



Mime
View raw message