Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3817518A30 for ; Thu, 23 Jul 2015 14:13:53 +0000 (UTC) Received: (qmail 80680 invoked by uid 500); 23 Jul 2015 14:13:19 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 80624 invoked by uid 500); 23 Jul 2015 14:13:19 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 80556 invoked by uid 99); 23 Jul 2015 14:13:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 14:13:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94100E6817; Thu, 23 Jul 2015 14:13:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 23 Jul 2015 14:13:20 -0000 Message-Id: In-Reply-To: <9d1e6a29d7774699891141b0c013b068@git.apache.org> References: <9d1e6a29d7774699891141b0c013b068@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [FLINK-2357] [web dashboard] New dashboard backend server supports requests from old web server as well. [FLINK-2357] [web dashboard] New dashboard backend server supports requests from old web server as well. Also moves TestRunner to test scope. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c52e753a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c52e753a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c52e753a Branch: refs/heads/master Commit: c52e753a8d3fc15ed48df7bfa24a327a90df9a0f Parents: 4473db6 Author: Stephan Ewen Authored: Thu Jul 23 12:01:13 2015 +0200 Committer: Stephan Ewen Committed: Thu Jul 23 16:11:35 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 11 - flink-runtime-web/pom.xml | 30 +- .../flink/runtime/webmonitor/TestRunner.java | 197 ------ .../runtime/webmonitor/WebRuntimeMonitor.java | 10 +- .../handlers/RequestJobIdsHandler.java | 2 +- .../legacy/JobManagerInfoHandler.java | 705 +++++++++++++++++++ .../runtime/webmonitor/legacy/JsonFactory.java | 112 +++ .../runtime/webmonitor/runner/TestRunner.java | 198 ++++++ .../web-dashboard/app/scripts/index.coffee | 6 +- flink-runtime-web/web-dashboard/server.js | 2 +- flink-runtime-web/web-dashboard/web/js/index.js | 5 +- .../flink/runtime/jobmanager/JobManager.scala | 3 - .../minicluster/LocalFlinkMiniCluster.scala | 9 +- .../test/util/ForkableFlinkMiniCluster.scala | 9 +- 14 files changed, 1058 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 9690f41..c76741b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -303,11 +303,6 @@ public final class ConfigConstants { * The option that specifies whether to use the new web frontend */ public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend"; - - /** - * The port for the runtime monitor web-frontend server. - */ - public static final String JOB_MANAGER_NEW_WEB_PORT_KEY = "jobmanager.new-web.port"; /** * The config parameter defining the number of archived jobs for the jobmanager @@ -612,12 +607,6 @@ public final class ConfigConstants { * Setting this value to {@code -1} disables the web frontend. */ public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081; - - /** - * The config key for the port of the JobManager new web frontend. - * Setting this value to {@code -1} disables the web frontend. - */ - public static final int DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT = 8082; /** * The default number of archived jobs for the jobmanager http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index ffe15af..0a05111 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -46,18 +46,6 @@ under the License. ${project.version} - - org.apache.flink - flink-test-utils - ${project.version} - - - - org.apache.flink - flink-java-examples - ${project.version} - - @@ -102,6 +90,24 @@ under the License. guava ${guava.version} + + + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + + org.apache.flink + flink-java-examples + ${project.version} + test + http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java deleted file mode 100644 index eecc81a..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FunctionAnnotation; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.java.relational.util.WebLogData; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.util.Collector; - -/** - * Simple runner that brings up a local cluster with the web server and executes two - * jobs to expose their data in the archive - */ -@SuppressWarnings("serial") -public class TestRunner { - - public static void main(String[] args) throws Exception { - - // start the cluster with the runtime monitor - Configuration configuration = new Configuration(); - configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true); - configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true); - configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, - "/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor"); - - LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false); - - final int port = cluster.getJobManagerRPCPort(); - runWordCount(port); - runWebLogAnalysisExample(port); - runWordCount(port); - - Object o = new Object(); - synchronized (o) { - o.wait(); - } - - cluster.shutdown(); - } - - private static void runWordCount(int port) throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port); - - DataSet text = env.fromElements(WordCountData.TEXT.split("\n")); - - DataSet> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - counts.print(); - } - - private static void runWebLogAnalysisExample(int port) throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port); - - // get input data - DataSet> documents = WebLogData.getDocumentDataSet(env); - DataSet> ranks = WebLogData.getRankDataSet(env); - DataSet> visits = WebLogData.getVisitDataSet(env); - - // Retain documents with keywords - DataSet> filterDocs = documents - .filter(new FilterDocByKeyWords()) - .project(0); - - // Filter ranks by minimum rank - DataSet> filterRanks = ranks - .filter(new FilterByRank()); - - // Filter visits by visit date - DataSet> filterVisits = visits - .filter(new FilterVisitsByDate()) - .project(0); - - // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords - DataSet> joinDocsRanks = - filterDocs.join(filterRanks) - .where(0).equalTo(1) - .projectSecond(0,1,2); - - // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time - DataSet> result = - joinDocsRanks.coGroup(filterVisits) - .where(1).equalTo(0) - .with(new AntiJoinVisits()); - - result.print(); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - public static final class Tokenizer implements FlatMapFunction> { - - @Override - public void flatMap(String value, Collector> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2(token, 1)); - } - } - } - } - - public static class FilterDocByKeyWords implements FilterFunction> { - - private static final String[] KEYWORDS = { " editors ", " oscillations " }; - - @Override - public boolean filter(Tuple2 value) throws Exception { - // FILTER - // Only collect the document if all keywords are contained - String docText = value.f1; - for (String kw : KEYWORDS) { - if (!docText.contains(kw)) { - return false; - } - } - return true; - } - } - - public static class FilterByRank implements FilterFunction> { - - private static final int RANKFILTER = 40; - - @Override - public boolean filter(Tuple3 value) throws Exception { - return (value.f0 > RANKFILTER); - } - } - - - public static class FilterVisitsByDate implements FilterFunction> { - - private static final int YEARFILTER = 2007; - - @Override - public boolean filter(Tuple2 value) throws Exception { - // Parse date string with the format YYYY-MM-DD and extract the year - String dateString = value.f1; - int year = Integer.parseInt(dateString.substring(0,4)); - return (year == YEARFILTER); - } - } - - - @FunctionAnnotation.ForwardedFieldsFirst("*") - public static class AntiJoinVisits implements CoGroupFunction, Tuple1, Tuple3> { - - @Override - public void coGroup(Iterable> ranks, Iterable> visits, Collector> out) { - // Check if there is a entry in the visits relation - if (!visits.iterator().hasNext()) { - for (Tuple3 next : ranks) { - // Emit all rank pairs - out.collect(next); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 3a8dd83..0aa6b07 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.webmonitor.handlers.RequestConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestJobIdsHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestOverviewHandler; +import org.apache.flink.runtime.webmonitor.legacy.JobManagerInfoHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +112,8 @@ public class WebRuntimeMonitor implements WebMonitor { } // port configuration - this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_NEW_WEB_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT); + this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); if (this.configuredPort < 0) { throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort); } @@ -133,7 +134,10 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs))) // .GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs))) - + + // the handler for the legacy requests + .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT)) + // this handler serves all the static contents .GET("/:*", new StaticFileServerHandler(webRootDir)); http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java index a09bc1a..1f28a01 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java @@ -38,7 +38,7 @@ import java.util.Map; * May serve the IDs of current jobs, or past jobs, depending on whether this handler is * given the JobManager or Archive Actor Reference. */ -public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse { +public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse { private final ActorRef target; http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java new file mode 100644 index 0000000..0a1e08c --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.legacy; + +import akka.actor.ActorRef; + +import akka.pattern.Patterns; +import akka.util.Timeout; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.KeepAliveWrite; +import io.netty.handler.codec.http.router.Routed; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.ArchiveMessages; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Tuple3; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +@ChannelHandler.Sharable +public class JobManagerInfoHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoHandler.class); + + private static final Charset ENCODING = Charset.forName("UTF-8"); + + /** Underlying JobManager */ + private final ActorRef jobmanager; + private final ActorRef archive; + private final FiniteDuration timeout; + + + public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) { + this.jobmanager = jobmanager; + this.archive = archive; + this.timeout = timeout; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { + DefaultFullHttpResponse response; + try { + String result = handleRequest(routed); + byte[] bytes = result.getBytes(ENCODING); + + response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + } + catch (Exception e) { + byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING); + response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + } + + response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + KeepAliveWrite.flush(ctx, routed.request(), response); + } + + + @SuppressWarnings("unchecked") + private String handleRequest(Routed routed) throws Exception { + if ("archive".equals(routed.queryParam("get"))) { + Future response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if(!(result instanceof ArchiveMessages.ArchivedJobs)) { + throw new RuntimeException("RequestArchiveJobs requires a response of type " + + "ArchivedJobs. Instead the response is of type " + result.getClass() +"."); + } + else { + final List archivedJobs = new ArrayList( + ((ArchiveMessages.ArchivedJobs) result).asJavaCollection()); + + return writeJsonForArchive(archivedJobs); + } + } + else if ("jobcounts".equals(routed.queryParam("get"))) { + Future response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if (!(result instanceof Tuple3)) { + throw new RuntimeException("RequestJobCounts requires a response of type " + + "Tuple3. Instead the response is of type " + result.getClass() + + "."); + } + else { + return writeJsonForJobCounts((Tuple3) result); + } + } + else if ("job".equals(routed.queryParam("get"))) { + String jobId = routed.queryParam("job"); + + Future response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if (!(result instanceof JobManagerMessages.JobResponse)){ + throw new RuntimeException("RequestJob requires a response of type JobResponse. " + + "Instead the response is of type " + result.getClass()); + } + else { + final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result; + + if (jobResponse instanceof JobManagerMessages.JobFound){ + ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)result).executionGraph(); + return writeJsonForArchivedJob(archivedJob); + } + else { + throw new Exception("DoGet:job: Could not find job for job ID " + jobId); + } + } + } + else if ("groupvertex".equals(routed.queryParam("get"))) { + String jobId = routed.queryParam("job"); + String groupVertexId = routed.queryParam("groupvertex"); + + // No group vertex specified + if (groupVertexId.equals("null")) { + throw new Exception("Found null groupVertexId"); + } + + Future response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if (!(result instanceof JobManagerMessages.JobResponse)){ + throw new RuntimeException("RequestJob requires a response of type JobResponse. " + + "Instead the response is of type " + result.getClass()); + } + else { + final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result; + + if (jobResponse instanceof JobManagerMessages.JobFound) { + ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)jobResponse).executionGraph(); + + return writeJsonForArchivedJobGroupvertex(archivedJob, JobVertexID.fromHexString(groupVertexId)); + } + else { + throw new Exception("DoGet:groupvertex: Could not find job for job ID " + jobId); + } + } + } + else if ("taskmanagers".equals(routed.queryParam("get"))) { + Future response = Patterns.ask(jobmanager, + JobManagerMessages.getRequestNumberRegisteredTaskManager(), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if (!(result instanceof Integer)) { + throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " + + "response of type Integer. Instead the response is of type " + + result.getClass() + "."); + } + else { + final int numberOfTaskManagers = (Integer)result; + + final Future responseRegisteredSlots = Patterns.ask(jobmanager, + JobManagerMessages.getRequestTotalNumberOfSlots(), + new Timeout(timeout)); + + final Object resultRegisteredSlots = Await.result(responseRegisteredSlots, + timeout); + + if (!(resultRegisteredSlots instanceof Integer)) { + throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " + + "type Integer. Instaed the response of type " + + resultRegisteredSlots.getClass() + "."); + } + else { + final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots; + + return "{\"taskmanagers\": " + numberOfTaskManagers + ", " + + "\"slots\": " + numberOfRegisteredSlots + "}"; + } + } + } + else if ("cancel".equals(routed.queryParam("get"))) { + String jobId = routed.queryParam("job"); + + Future response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)), + new Timeout(timeout)); + + Await.ready(response, timeout); + return "{}"; + } + else if ("updates".equals(routed.queryParam("get"))) { + String jobId = routed.queryParam("job"); + return writeJsonUpdatesForJob(JobID.fromHexString(jobId)); + } + else if ("version".equals(routed.queryParam("get"))) { + return writeJsonForVersion(); + } + else{ + Future response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(), + new Timeout(timeout)); + + Object result = Await.result(response, timeout); + + if(!(result instanceof JobManagerMessages.RunningJobs)){ + throw new RuntimeException("RequestRunningJobs requires a response of type " + + "RunningJobs. Instead the response of type " + result.getClass() + "."); + } + else { + final Iterable runningJobs = + ((JobManagerMessages.RunningJobs) result).asJavaIterable(); + + return writeJsonForJobs(runningJobs); + } + } + } + + private String writeJsonForJobs(Iterable graphs) { + StringBuilder bld = new StringBuilder(); + bld.append("["); + + Iterator it = graphs.iterator(); + // Loop Jobs + while(it.hasNext()){ + ExecutionGraph graph = it.next(); + + writeJsonForJob(bld, graph); + + //Write seperator between json objects + if(it.hasNext()) { + bld.append(","); + } + } + bld.append("]"); + + return bld.toString(); + } + + private void writeJsonForJob(StringBuilder bld, ExecutionGraph graph) { + //Serialize job to json + bld.append("{"); + bld.append("\"jobid\": \"").append(graph.getJobID()).append("\","); + bld.append("\"jobname\": \"").append(graph.getJobName()).append("\","); + bld.append("\"status\": \"").append(graph.getState()).append("\","); + bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())).append(","); + + // Serialize ManagementGraph to json + bld.append("\"groupvertices\": ["); + boolean first = true; + + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if (first) { + first = false; + } else { + bld.append(","); + } + bld.append(JsonFactory.toJson(groupVertex)); + } + bld.append("]"); + bld.append("}"); + } + + private String writeJsonForArchive(List graphs) { + StringBuilder bld = new StringBuilder(); + bld.append("["); + + // sort jobs by time + Collections.sort(graphs, new Comparator() { + @Override + public int compare(ExecutionGraph o1, ExecutionGraph o2) { + if (o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) { + return 1; + } else { + return -1; + } + } + + }); + + // Loop Jobs + for (int i = 0; i < graphs.size(); i++) { + ExecutionGraph graph = graphs.get(i); + + //Serialize job to json + bld.append("{"); + bld.append("\"jobid\": \"").append(graph.getJobID()).append("\","); + bld.append("\"jobname\": \"").append(graph.getJobName()).append("\","); + bld.append("\"status\": \"").append(graph.getState()).append("\","); + bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())); + + bld.append("}"); + + //Write seperator between json objects + if(i != graphs.size() - 1) { + bld.append(","); + } + } + bld.append("]"); + return bld.toString(); + } + + private String writeJsonForJobCounts(Tuple3 jobCounts) { + return "{\"finished\": " + jobCounts._1() + ",\"canceled\": " + jobCounts._2() + ",\"failed\": " + + jobCounts._3() + "}"; + } + + + private String writeJsonForArchivedJob(ExecutionGraph graph) { + StringBuilder bld = new StringBuilder(); + + bld.append("["); + bld.append("{"); + bld.append("\"jobid\": \"").append(graph.getJobID()).append("\","); + bld.append("\"jobname\": \"").append(graph.getJobName()).append("\","); + bld.append("\"status\": \"").append(graph.getState()).append("\","); + bld.append("\"SCHEDULED\": ").append(graph.getStatusTimestamp(JobStatus.CREATED)).append(","); + bld.append("\"RUNNING\": ").append(graph.getStatusTimestamp(JobStatus.RUNNING)).append(","); + bld.append("\"FINISHED\": ").append(graph.getStatusTimestamp(JobStatus.FINISHED)).append(","); + bld.append("\"FAILED\": ").append(graph.getStatusTimestamp(JobStatus.FAILED)).append(","); + bld.append("\"CANCELED\": ").append(graph.getStatusTimestamp(JobStatus.CANCELED)).append(","); + + if (graph.getState() == JobStatus.FAILED) { + bld.append("\"failednodes\": ["); + boolean first = true; + for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { + if (vertex.getExecutionState() == ExecutionState.FAILED) { + InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation(); + Throwable failureCause = vertex.getFailureCause(); + if (location != null || failureCause != null) { + if (first) { + first = false; + } else { + bld.append(","); + } + bld.append("{"); + bld.append("\"node\": \"").append(location == null ? "(none)" : location.getFQDNHostname()).append("\","); + bld.append("\"message\": \"").append(failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))).append("\""); + bld.append("}"); + } + } + } + bld.append("],"); + } + + // Serialize ManagementGraph to json + bld.append("\"groupvertices\": ["); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + //Write seperator between json objects + if (first) { + first = false; + } else { + bld.append(","); + } + + bld.append(JsonFactory.toJson(groupVertex)); + + } + bld.append("],"); + + // write user config + ExecutionConfig ec = graph.getExecutionConfig(); + if(ec != null) { + bld.append("\"executionConfig\": {"); + bld.append("\"Execution Mode\": \"").append(ec.getExecutionMode()).append("\","); + bld.append("\"Number of execution retries\": \"").append(ec.getNumberOfExecutionRetries()).append("\","); + bld.append("\"Job parallelism\": \"").append(ec.getParallelism()).append("\","); + bld.append("\"Object reuse mode\": \"").append(ec.isObjectReuseEnabled()).append("\""); + ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters(); + if(uc != null) { + Map ucVals = uc.toMap(); + if (ucVals != null) { + String ucString = "{"; + int i = 0; + for (Map.Entry ucVal : ucVals.entrySet()) { + ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\""; + if (++i < ucVals.size()) { + ucString += ",\n"; + } + } + bld.append(", \"userConfig\": ").append(ucString).append("}"); + } + else { + LOG.debug("GlobalJobParameters.toMap() did not return anything"); + } + } + else { + LOG.debug("No GlobalJobParameters were set in the execution config"); + } + bld.append("},"); + } + else { + LOG.warn("Unable to retrieve execution config from execution graph"); + } + + // write accumulators + final Future response = Patterns.ask(jobmanager, + new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout)); + + Object result; + try { + result = Await.result(response, timeout); + } + catch (Exception ex) { + throw new RuntimeException("Could not retrieve the accumulator results from the job manager.", ex); + } + + if (result instanceof AccumulatorResultStringsFound) { + StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result(); + + bld.append("\n\"accumulators\": ["); + int i = 0; + for (StringifiedAccumulatorResult accumulator : accumulators) { + bld.append("{ \"name\": \"").append(accumulator.getName()).append(" (").append(accumulator.getType()).append(")\",").append(" \"value\": \"").append(accumulator.getValue()).append("\"}\n"); + if (++i < accumulators.length) { + bld.append(","); + } + } + bld.append("],\n"); + } + else if (result instanceof AccumulatorResultsNotFound) { + bld.append("\n\"accumulators\": [],"); + } + else if (result instanceof AccumulatorResultsErroneous) { + LOG.error("Could not obtain accumulators for job " + graph.getJobID(), + ((AccumulatorResultsErroneous) result).cause()); + } + else { + throw new RuntimeException("RequestAccumulatorResults requires a response of type " + + "AccumulatorResultStringsFound. Instead the response is of type " + + result.getClass() + "."); + } + + bld.append("\"groupverticetimes\": {"); + first = true; + + for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) { + if (first) { + first = false; + } else { + bld.append(","); + } + + // Calculate start and end time for groupvertex + long started = Long.MAX_VALUE; + long ended = 0; + + // Take earliest running state and latest endstate of groupmembers + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + long running = vertex.getStateTimestamp(ExecutionState.RUNNING); + if (running != 0 && running < started) { + started = running; + } + + long finished = vertex.getStateTimestamp(ExecutionState.FINISHED); + long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED); + long failed = vertex.getStateTimestamp(ExecutionState.FAILED); + + if (finished != 0 && finished > ended) { + ended = finished; + } + + if (canceled != 0 && canceled > ended) { + ended = canceled; + } + + if (failed != 0 && failed > ended) { + ended = failed; + } + + } + + bld.append("\"").append(groupVertex.getJobVertexId()).append("\": {"); + bld.append("\"groupvertexid\": \"").append(groupVertex.getJobVertexId()).append("\","); + bld.append("\"groupvertexname\": \"").append(groupVertex).append("\","); + bld.append("\"STARTED\": ").append(started).append(","); + bld.append("\"ENDED\": ").append(ended); + bld.append("}"); + + } + + bld.append("}"); + bld.append("}"); + bld.append("]"); + + return bld.toString(); + } + + + private String writeJsonUpdatesForJob(JobID jobId) { + final Future responseArchivedJobs = Patterns.ask(jobmanager, + JobManagerMessages.getRequestRunningJobs(), + new Timeout(timeout)); + + Object resultArchivedJobs; + try{ + resultArchivedJobs = Await.result(responseArchivedJobs, timeout); + } + catch (Exception ex) { + throw new RuntimeException("Could not retrieve archived jobs from the job manager.", ex); + } + + if(!(resultArchivedJobs instanceof JobManagerMessages.RunningJobs)){ + throw new RuntimeException("RequestArchivedJobs requires a response of type " + + "RunningJobs. Instead the response is of type " + + resultArchivedJobs.getClass() + "."); + } + else { + final Iterable graphs = ((JobManagerMessages.RunningJobs)resultArchivedJobs). + asJavaIterable(); + + //Serialize job to json + final StringBuilder bld = new StringBuilder(); + + bld.append("{"); + bld.append("\"jobid\": \"").append(jobId).append("\","); + bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\","); + bld.append("\"recentjobs\": ["); + + boolean first = true; + + for (ExecutionGraph g : graphs){ + if (first) { + first = false; + } else { + bld.append(","); + } + + bld.append("\"").append(g.getJobID()).append("\""); + } + bld.append("],"); + + final Future responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId), + new Timeout(timeout)); + + Object resultJob; + try{ + resultJob = Await.result(responseJob, timeout); + } + catch (Exception ex){ + throw new RuntimeException("Could not retrieve the job with jobID " + jobId + + "from the job manager.", ex); + } + + if (!(resultJob instanceof JobManagerMessages.JobResponse)) { + throw new RuntimeException("RequestJob requires a response of type JobResponse. " + + "Instead the response is of type " + resultJob.getClass() + "."); + } + else { + final JobManagerMessages.JobResponse response = (JobManagerMessages.JobResponse) resultJob; + + if (response instanceof JobManagerMessages.JobFound){ + ExecutionGraph graph = ((JobManagerMessages.JobFound)response).executionGraph(); + + bld.append("\"vertexevents\": ["); + + first = true; + for (ExecutionVertex ev : graph.getAllExecutionVertices()) { + if (first) { + first = false; + } else { + bld.append(","); + } + + bld.append("{"); + bld.append("\"vertexid\": \"").append(ev.getCurrentExecutionAttempt().getAttemptId()).append("\","); + bld.append("\"newstate\": \"").append(ev.getExecutionState()).append("\","); + bld.append("\"timestamp\": \"").append(ev.getStateTimestamp(ev.getExecutionState())).append("\""); + bld.append("}"); + } + + bld.append("],"); + + bld.append("\"jobevents\": ["); + + bld.append("{"); + bld.append("\"newstate\": \"").append(graph.getState()).append("\","); + bld.append("\"timestamp\": \"").append(graph.getStatusTimestamp(graph.getState())).append("\""); + bld.append("}"); + + bld.append("]"); + + bld.append("}"); + } + else { + bld.append("\"vertexevents\": [],"); + bld.append("\"jobevents\": ["); + bld.append("{"); + bld.append("\"newstate\": \"").append(JobStatus.FINISHED.toString()).append("\","); + bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\""); + bld.append("}"); + bld.append("]"); + bld.append("}"); + } + } + + return bld.toString(); + } + } + + private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId) { + ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId); + StringBuilder bld = new StringBuilder(); + + bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(","); + + bld.append("\"verticetimes\": {"); + boolean first = true; + for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) { + + for (ExecutionVertex vertex : groupVertex.getTaskVertices()) { + + Execution exec = vertex.getCurrentExecutionAttempt(); + + if(first) { + first = false; + } else { + bld.append(","); } + + bld.append("\"").append(exec.getAttemptId()).append("\": {"); + bld.append("\"vertexid\": \"").append(exec.getAttemptId()).append("\","); + bld.append("\"vertexname\": \"").append(vertex).append("\","); + bld.append("\"CREATED\": ").append(vertex.getStateTimestamp(ExecutionState.CREATED)).append(","); + bld.append("\"SCHEDULED\": ").append(vertex.getStateTimestamp(ExecutionState.SCHEDULED)).append(","); + bld.append("\"DEPLOYING\": ").append(vertex.getStateTimestamp(ExecutionState.DEPLOYING)).append(","); + bld.append("\"RUNNING\": ").append(vertex.getStateTimestamp(ExecutionState.RUNNING)).append(","); + bld.append("\"FINISHED\": ").append(vertex.getStateTimestamp(ExecutionState.FINISHED)).append(","); + bld.append("\"CANCELING\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELING)).append(","); + bld.append("\"CANCELED\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELED)).append(","); + bld.append("\"FAILED\": ").append(vertex.getStateTimestamp(ExecutionState.FAILED)).append(""); + bld.append("}"); + } + + } + bld.append("}}"); + return bld.toString(); + } + + + private String writeJsonForVersion() { + return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \"" + + EnvironmentInformation.getRevisionInformation().commitId + "\"}"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java new file mode 100644 index 0000000..fe18d3f --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.legacy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JsonFactory { + + public static String toJson(ExecutionVertex vertex) { + StringBuilder json = new StringBuilder(""); + json.append("{"); + json.append("\"vertexid\": \"").append(vertex.getCurrentExecutionAttempt().getAttemptId()).append("\","); + json.append("\"vertexname\": \"").append(StringUtils.escapeHtml(vertex.getSimpleName())).append("\","); + json.append("\"vertexstatus\": \"").append(vertex.getExecutionState()).append("\","); + + InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation(); + String instanceName = location == null ? "(null)" : location.getFQDNHostname(); + + json.append("\"vertexinstancename\": \"").append(instanceName).append("\""); + json.append("}"); + return json.toString(); + } + + public static String toJson(ExecutionJobVertex jobVertex) { + StringBuilder json = new StringBuilder(""); + + json.append("{"); + json.append("\"groupvertexid\": \"").append(jobVertex.getJobVertexId()).append("\","); + json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\","); + json.append("\"numberofgroupmembers\": ").append(jobVertex.getParallelism()).append(","); + json.append("\"groupmembers\": ["); + + // Count state status of group members + Map stateCounts = new HashMap(); + + // initialize with 0 + for (ExecutionState state : ExecutionState.values()) { + stateCounts.put(state, 0); + } + + ExecutionVertex[] vertices = jobVertex.getTaskVertices(); + + for (int j = 0; j < vertices.length; j++) { + ExecutionVertex vertex = vertices[j]; + + json.append(toJson(vertex)); + + // print delimiter + if (j != vertices.length - 1) { + json.append(","); + } + + // Increment state status count + int count = stateCounts.get(vertex.getExecutionState()) + 1; + stateCounts.put(vertex.getExecutionState(), count); + } + + json.append("],"); + json.append("\"backwardEdges\": ["); + + List inputs = jobVertex.getInputs(); + + for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) { + ExecutionJobVertex input = inputs.get(inputNumber).getProducer(); + + json.append("{"); + json.append("\"groupvertexid\": \"").append(input.getJobVertexId()).append("\","); + json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\""); + json.append("}"); + + // print delimiter + if(inputNumber != inputs.size() - 1) { + json.append(","); + } + } + json.append("]"); + + // list number of members for each status + for (Map.Entry stateCount : stateCounts.entrySet()) { + json.append(",\"").append(stateCount.getKey()).append("\": ").append(stateCount.getValue()); + } + + json.append("}"); + + return json.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java new file mode 100644 index 0000000..9a9b6ba --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.runner; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.java.relational.util.WebLogData; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +/** + * Simple runner that brings up a local cluster with the web server and executes two + * jobs to expose their data in the archive + */ +@SuppressWarnings("serial") +public class TestRunner { + + public static void main(String[] args) throws Exception { + + // start the cluster with the runtime monitor + Configuration configuration = new Configuration(); + configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true); + configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true); + configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, + "/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor"); + + LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false); + + final int port = cluster.getJobManagerRPCPort(); + runWordCount(port); + runWebLogAnalysisExample(port); + runWordCount(port); + + // block the thread + Object o = new Object(); + synchronized (o) { + o.wait(); + } + + cluster.shutdown(); + } + + private static void runWordCount(int port) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port); + + DataSet text = env.fromElements(WordCountData.TEXT.split("\n")); + + DataSet> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + counts.print(); + } + + private static void runWebLogAnalysisExample(int port) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port); + + // get input data + DataSet> documents = WebLogData.getDocumentDataSet(env); + DataSet> ranks = WebLogData.getRankDataSet(env); + DataSet> visits = WebLogData.getVisitDataSet(env); + + // Retain documents with keywords + DataSet> filterDocs = documents + .filter(new FilterDocByKeyWords()) + .project(0); + + // Filter ranks by minimum rank + DataSet> filterRanks = ranks + .filter(new FilterByRank()); + + // Filter visits by visit date + DataSet> filterVisits = visits + .filter(new FilterVisitsByDate()) + .project(0); + + // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords + DataSet> joinDocsRanks = + filterDocs.join(filterRanks) + .where(0).equalTo(1) + .projectSecond(0,1,2); + + // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time + DataSet> result = + joinDocsRanks.coGroup(filterVisits) + .where(1).equalTo(0) + .with(new AntiJoinVisits()); + + result.print(); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + public static final class Tokenizer implements FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + + public static class FilterDocByKeyWords implements FilterFunction> { + + private static final String[] KEYWORDS = { " editors ", " oscillations " }; + + @Override + public boolean filter(Tuple2 value) throws Exception { + // FILTER + // Only collect the document if all keywords are contained + String docText = value.f1; + for (String kw : KEYWORDS) { + if (!docText.contains(kw)) { + return false; + } + } + return true; + } + } + + public static class FilterByRank implements FilterFunction> { + + private static final int RANKFILTER = 40; + + @Override + public boolean filter(Tuple3 value) throws Exception { + return (value.f0 > RANKFILTER); + } + } + + + public static class FilterVisitsByDate implements FilterFunction> { + + private static final int YEARFILTER = 2007; + + @Override + public boolean filter(Tuple2 value) throws Exception { + // Parse date string with the format YYYY-MM-DD and extract the year + String dateString = value.f1; + int year = Integer.parseInt(dateString.substring(0,4)); + return (year == YEARFILTER); + } + } + + + @FunctionAnnotation.ForwardedFieldsFirst("*") + public static class AntiJoinVisits implements CoGroupFunction, Tuple1, Tuple3> { + + @Override + public void coGroup(Iterable> ranks, Iterable> visits, Collector> out) { + // Check if there is a entry in the visits relation + if (!visits.iterator().hasNext()) { + for (Tuple3 next : ranks) { + // Emit all rank pairs + out.collect(next); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/app/scripts/index.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee index b85d7e0..385b3d6 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee @@ -29,11 +29,9 @@ angular.module('flinkApp', ['ui.router', 'angularMoment']) # -------------------------------------- .constant 'flinkConfig', { - webServer: 'http://localhost:8080' jobServer: 'http://localhost:8081' - newServer: 'http://localhost:8082' -# webServer: 'http://localhost:3000/web-server' -# jobServer: 'http://localhost:3000/job-server' + newServer: 'http://localhost:8081' +# jobServer: 'http://localhost:3000/new-server' # newServer: 'http://localhost:3000/new-server' refreshInterval: 10000 } http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/server.js ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/server.js b/flink-runtime-web/web-dashboard/server.js index 797cfa0..453e7a4 100644 --- a/flink-runtime-web/web-dashboard/server.js +++ b/flink-runtime-web/web-dashboard/server.js @@ -29,7 +29,7 @@ var server = new Hapi.Server(); var remotes = [ { port: 8080, path: 'web-server' }, { port: 8081, path: 'job-server' }, - { port: 8082, path: 'new-server' } + { port: 8081, path: 'new-server' } ] server.connection({ port: 3000 });