flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Date Fri, 11 Mar 2016 11:02:00 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r55816596
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
---
    @@ -0,0 +1,344 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Address;
    +import com.typesafe.config.Config;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.runtime.webmonitor.WebMonitor;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.slf4j.Logger;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +import java.io.PrintWriter;
    +import java.net.BindException;
    +import java.net.ServerSocket;
    +import java.util.Iterator;
    +
    +/**
    + * Tools for starting JobManager and TaskManager processes, including the
    + * Actor Systems used to run the JobManager and TaskManager actors.
    + */
    +public class BootstrapTools {
    +
    +	/**
    +	 * Starts an ActorSystem with the given configuration listening at the address/ports.
    +	 * @param configuration The Flink configuration
    +	 * @param listeningAddress The address to listen at.
    +	 * @param portRangeDefinition The port range to choose a port from.
    +	 * @param logger The logger to output log information.
    +	 * @return The ActorSystem which has been started
    +	 * @throws Exception
    +	 */
    +	public static ActorSystem startActorSystem(Configuration configuration,
    +										String listeningAddress,
    +										String portRangeDefinition,
    +										Logger logger) throws Exception {
    +
    +		// parse port range definition and create port iterator
    +		Iterator<Integer> portsIterator;
    +		try {
    +			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
    +		} catch (Exception e) {
    +			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
    +		}
    +
    +		while (portsIterator.hasNext()) {
    +			// first, we check if the port is available by opening a socket
    +			// if the actor system fails to start on the port, we try further
    +			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
    +				portsIterator,
    +				new NetUtils.SocketFactory() {
    +					@Override
    +					public ServerSocket createSocket(int port) throws IOException {
    +						return new ServerSocket(port);
    +					}
    +				});
    +
    +			int port;
    +			if (availableSocket == null) {
    +				throw new BindException("Unable to allocate further port in port range: " + portRangeDefinition);
    +			} else {
    +				port = availableSocket.getLocalPort();
    +				try {
    +					availableSocket.close();
    +				} catch (IOException ignored) {}
    +			}
    +
    +			try {
    +				return startActorSystem(configuration, listeningAddress, port, logger);
    +			}
    +			catch (Exception e) {
    +				// we can continue to try if this contains a netty channel exception
    +				Throwable cause = e.getCause();
    +				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
    +						cause instanceof java.net.BindException)) {
    +					throw e;
    +				} // else fall through the loop and try the next port
    +			}
    +		}
    +
    +		// if we come here, we have exhausted the port range
    +		throw new BindException("Could not start actor system on any port in port range "
    +			+ portRangeDefinition);
    +	}
    +
    +	/**
    +	 * Starts an Actor System at a specific port.
    +	 * @param configuration The Flink configuration.
    +	 * @param listeningAddress The address to listen at.
    +	 * @param listeningPort The port to listen at.
    +	 * @param logger the logger to output log information.
    +	 * @return The ActorSystem which has been started.
    +	 * @throws Exception
    +	 */
    +	public static ActorSystem startActorSystem(
    +						Configuration configuration,
    +						String listeningAddress,
    +						int listeningPort,
    +						Logger logger) throws Exception {
    +
    +		String hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort);
    +		logger.info("Trying to start actor system at " + hostPortUrl);
    +
    +		try {
    +			Config akkaConfig = AkkaUtils.getAkkaConfig(
    +				configuration,
    +				new scala.Some<>(new scala.Tuple2<String, Object>(listeningAddress, listeningPort))
    +			);
    +			if (logger.isDebugEnabled()) {
    +				logger.debug("Using akka configuration\n " + akkaConfig);
    +			}
    +
    +			ActorSystem actorSystem = AkkaUtils.createActorSystem(akkaConfig);
    +			logger.info("Actor system started at " + hostPortUrl);
    --- End diff --
    
    we usually use info("some log string {}", value); to avoid string concat when logging
is disabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message