Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A14621725A for ; Wed, 9 Sep 2015 22:12:37 +0000 (UTC) Received: (qmail 15455 invoked by uid 500); 9 Sep 2015 22:12:37 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 15416 invoked by uid 500); 9 Sep 2015 22:12:37 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 15407 invoked by uid 99); 9 Sep 2015 22:12:37 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1BBA8E1D4E for ; Wed, 9 Sep 2015 22:12:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id jFF5qGhELldX for ; Wed, 9 Sep 2015 22:12:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 4FF7B20BFB for ; Wed, 9 Sep 2015 22:12:34 +0000 (UTC) Received: (qmail 15196 invoked by uid 99); 9 Sep 2015 22:12:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87BFBE0992; Wed, 9 Sep 2015 22:12:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chetan@apache.org To: commits@apex.incubator.apache.org Date: Wed, 09 Sep 2015 22:13:05 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [34/50] incubator-apex-core git commit: APEX-37 Added APPLICATION_ATTEMPT_ID attribute and change operator and container history file so that different application attempts write to separate HDFS files APEX-37 Added APPLICATION_ATTEMPT_ID attribute and change operator and container history file so that different application attempts write to separate HDFS files Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b8c0b4cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b8c0b4cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b8c0b4cc Branch: refs/heads/master Commit: b8c0b4cc06b1d3b4a94c9027da0cb44da83f4c28 Parents: 9d08532 Author: David Yan Authored: Thu Aug 6 18:35:32 2015 -0700 Committer: David Yan Committed: Tue Aug 18 13:45:14 2015 -0700 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 2 +- .../stram/StreamingContainerManager.java | 66 +++++++++++--------- .../stram/plan/logical/LogicalPlan.java | 5 ++ .../datatorrent/stram/util/FSJsonLineFile.java | 24 ++----- .../datatorrent/stram/StramRecoveryTest.java | 2 +- 5 files changed, 46 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 5246c9e..98c78de 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -511,7 +511,7 @@ public class StreamingAppMasterService extends CompositeService if (dag.isDebug()) { dumpOutDebugInfo(); } - + dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId()); FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), conf); this.dnmgr = StreamingContainerManager.getInstance(recoveryHandler, dag, true); dag = this.dnmgr.getLogicalPlan(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 64850f5..6e0f3f5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -127,6 +127,8 @@ public class StreamingContainerManager implements PlanContext private final static Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class); public final static String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login"; public final static String BUILTIN_APPDATA_URL = "builtin"; + public final static String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json"; + public final static String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json"; public final static String APP_META_FILENAME = "meta.json"; public final static String APP_META_KEY_ATTRIBUTES = "attributes"; public final static String APP_META_KEY_METRICS = "metrics"; @@ -205,7 +207,7 @@ public class StreamingContainerManager implements PlanContext }; private FSJsonLineFile containerFile; - private final ConcurrentMap operatorFiles = Maps.newConcurrentMap(); + private FSJsonLineFile operatorFile; private final long startTime = System.currentTimeMillis(); @@ -359,8 +361,11 @@ public class StreamingContainerManager implements PlanContext Configuration config = new YarnConfiguration(); fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config); saveMetaInfo(); - this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault()); + String fileName = String.format(CONTAINERS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID)); + this.containerFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault()); this.containerFile.append(getAppMasterContainerInfo()); + fileName = String.format(OPERATORS_INFO_FILENAME_FORMAT, plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID)); + this.operatorFile = new FSJsonLineFile(fileContext, new Path(this.vars.appPath, fileName), FsPermission.getDefault()); } catch (IOException ex) { throw DTThrowable.wrapIfChecked(ex); } @@ -490,9 +495,7 @@ public class StreamingContainerManager implements PlanContext } IOUtils.closeQuietly(containerFile); - for (FSJsonLineFile operatorFile : operatorFiles.values()) { - IOUtils.closeQuietly(operatorFile); - } + IOUtils.closeQuietly(operatorFile); if(poolExecutor != null) { poolExecutor.shutdown(); } @@ -854,8 +857,7 @@ public class StreamingContainerManager implements PlanContext private void saveMetaInfo() throws IOException { Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime()); - try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { - JSONObject top = new JSONObject(); + try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { JSONObject top = new JSONObject(); JSONObject attributes = new JSONObject(); for (Map.Entry, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) { attributes.put(entry.getKey().getSimpleName(), entry.getValue()); @@ -1379,7 +1381,7 @@ public class StreamingContainerManager implements PlanContext { long currentTimeMillis = clock.getTime(); - StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId()); + final StreamingContainerAgent sca = this.containers.get(heartbeat.getContainerId()); if (sca == null || sca.container.getState() == PTContainer.State.KILLED) { // could be orphaned container that was replaced and needs to terminate LOG.error("Unknown container {}", heartbeat.getContainerId()); @@ -1395,34 +1397,35 @@ public class StreamingContainerManager implements PlanContext sca.container.bufferServerAddress = InetSocketAddress.createUnresolved(heartbeat.bufferServerHost, heartbeat.bufferServerPort); LOG.info("Container {} buffer server: {}", sca.container.getExternalId(), sca.container.bufferServerAddress); } - long containerStartTime = System.currentTimeMillis(); + final long containerStartTime = System.currentTimeMillis(); sca.container.setState(PTContainer.State.ACTIVE); sca.container.setStartedTime(containerStartTime); sca.container.setFinishedTime(-1); sca.jvmName = heartbeat.jvmName; - try { - containerFile.append(sca.getContainerInfo()); - } - catch (IOException ex) { - LOG.warn("Cannot write to container file"); - } - for (PTOperator ptOp : sca.container.getOperators()) { - try { - FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId()); - if (operatorFile == null) { - operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault())); - operatorFile = operatorFiles.get(ptOp.getId()); + poolExecutor.submit(new Runnable() + { + @Override + public void run() + { + try { + containerFile.append(sca.getContainerInfo()); + } catch (IOException ex) { + LOG.warn("Cannot write to container file"); + } + for (PTOperator ptOp : sca.container.getOperators()) { + try { + JSONObject operatorInfo = new JSONObject(); + operatorInfo.put("name", ptOp.getName()); + operatorInfo.put("id", ptOp.getId()); + operatorInfo.put("container", sca.container.getExternalId()); + operatorInfo.put("startTime", containerStartTime); + operatorFile.append(operatorInfo); + } catch (IOException | JSONException ex) { + LOG.warn("Cannot write to operator file: ", ex); + } } - JSONObject operatorInfo = new JSONObject(); - operatorInfo.put("name", ptOp.getName()); - operatorInfo.put("container", sca.container.getExternalId()); - operatorInfo.put("startTime", containerStartTime); - operatorFile.append(operatorInfo); - } - catch (Exception ex) { - LOG.warn("Cannot write to operator file: ", ex); } - } + }); } if (heartbeat.restartRequested) { @@ -2823,9 +2826,10 @@ public class StreamingContainerManager implements PlanContext scm = new StreamingContainerManager(dag, enableEventRecording, new SystemClock()); } else { - scm = new StreamingContainerManager(checkpointedState, enableEventRecording); // find better way to support final transient members PhysicalPlan plan = checkpointedState.physicalPlan; + plan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, dag.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID)); + scm = new StreamingContainerManager(checkpointedState, enableEventRecording); for (Field f : plan.getClass().getDeclaredFields()) { if (f.getType() == PlanContext.class) { f.setAccessible(true); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index d140d17..2d088b8 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -130,6 +130,11 @@ public class LogicalPlan implements Serializable, DAG */ public static Attribute CONTAINERS_MAX_COUNT = new Attribute(Integer.MAX_VALUE); + /** + * The application attempt ID from YARN + */ + public static Attribute APPLICATION_ATTEMPT_ID = new Attribute<>(1); + static { Attribute.AttributeMap.AttributeInitializer.initialize(LogicalPlan.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java index 7935ce4..3b5a31e 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java +++ b/engine/src/main/java/com/datatorrent/stram/util/FSJsonLineFile.java @@ -17,7 +17,8 @@ package com.datatorrent.stram.util; import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.conf.Configuration; +import java.util.EnumSet; + import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.codehaus.jackson.map.ObjectMapper; @@ -32,29 +33,13 @@ import org.slf4j.LoggerFactory; */ public class FSJsonLineFile implements Closeable { - private final FileSystem fs; private final ObjectMapper objectMapper; private final FSDataOutputStream os; private static final Logger LOG = LoggerFactory.getLogger(FSJsonLineFile.class); - public FSJsonLineFile(Path path, FsPermission permission) throws IOException + public FSJsonLineFile(FileContext fileContext, Path path, FsPermission permission) throws IOException { - fs = FileSystem.newInstance(path.toUri(), new Configuration()); - FSDataOutputStream myos; - if (fs.exists(path)) { - try { - // happens if not the first application attempt - myos = fs.append(path); - } - catch (IOException ex) { - LOG.warn("Caught exception (OK during unit test): {}", ex.getMessage()); - myos = FileSystem.create(fs, path, permission); - } - } - else { - myos = FileSystem.create(fs, path, permission); - } - os = myos; + this.os = fileContext.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.perms(permission)); this.objectMapper = (new JSONSerializationProvider()).getContext(null); } @@ -74,7 +59,6 @@ public class FSJsonLineFile implements Closeable public void close() throws IOException { os.close(); - fs.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b8c0b4cc/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 6172d8a..ab2092a 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -391,6 +391,7 @@ public class StramRecoveryTest LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1); dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1); + dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1); dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); dag.addOperator("o1", StatsListeningOperator.class); @@ -408,7 +409,6 @@ public class StramRecoveryTest PTOperator o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0); long[] ids = new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId()); Assert.assertArrayEquals(new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids); - Assert.assertNull(o1p1.getContainer().getExternalId()); // trigger journal write o1p1.getContainer().setExternalId("cid1");