flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...
Date Wed, 24 Aug 2016 12:33:29 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2410#discussion_r76047217
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java
---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.heartbeat;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.rpc.RpcGateway;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * This utility class implements the basis of trigger heartbeat from one component to
another component periodically,
    + * for example trigger heartbeat from the ResourceManager to TaskExecutor.
    + *
    + * @param <Gateway> The type of the gateway to connect to.
    + * @param <Payload> The type of the successful heartbeat responses with payload.
    + */
    +public abstract class HeartbeatScheduler<Gateway extends RpcGateway, Payload extends
Serializable> {
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** default max heartbeat interval time in millisecond (which is used in retry heartbeat
case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** default heartbeat attempt delay after an exception has occurred */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** default max heartbeat retry time for one heartbeat */
    +	private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 60000;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final long maxAttemptTime;
    +
    +	/** target gateway to receive the heartbeat and give heartbeatResponse */
    +	protected final Gateway targetGateway;
    +
    +	/** the target address */
    +	private final String targetAddress;
    +
    +	/** the target gateway name */
    +	private final String targetName;
    +
    +	private final RpcService rpcService;
    +
    +	private final UUID leaderID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * @param rpcService    rpcService
    +	 * @param leaderID      leader session id of current source end which send heartbeat
    +	 * @param targetGateway target gateway which receive heartbeat and response
    +	 * @param targetAddress target gateway address
    +	 * @param targetName    target name
    +	 * @param log           log
    +	 */
    +	public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway targetGateway,
    +		String targetAddress, String targetName, Logger log) {
    +		this(rpcService, leaderID, targetGateway, targetAddress, targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS,
    +			INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS,
MAX_HEARTBEAT_ATTEMPT_MILLIS);
    +	}
    +
    +	/**
    +	 * @param rpcService          rpcService
    +	 * @param leaderID            leader session id of current source end which send heartbeat
    +	 * @param targetGateway       target gateway which receive heartbeat and response
    +	 * @param targetAddress       target gateway address
    +	 * @param targetName          target name
    +	 * @param log                 log
    +	 * @param heartbeatInterval   heartbeat interval time in millisecond
    +	 * @param heartbeatTimeout    heartbeat timeout in millisecond
    +	 * @param maxHeartbeatTimeout max heartbeat interval time in millisecond
    +	 * @param delayOnError        Heartbeat attempt delay after an exception has occurred
    +	 * @param maxAttemptTime      max retry time for one heartbeat
    +	 */
    +	public HeartbeatScheduler(
    +		RpcService rpcService, UUID leaderID, Gateway targetGateway,
    +		String targetAddress, String targetName, Logger log, long heartbeatInterval,
    +		long heartbeatTimeout, long maxHeartbeatTimeout, long delayOnError, long maxAttemptTime)
{
    +		checkArgument(heartbeatInterval > 0, "initial heartbeat interval must be greater
than zero");
    +		checkArgument(heartbeatTimeout > 0, "initial heartbeat timeout must be greater than
zero");
    +		checkArgument(maxHeartbeatTimeout > 0, "maximum heartbeat timeout must be greater
than zero");
    +		checkArgument(delayOnError >= 0, "delay on error must be non-negative");
    +		checkArgument(maxAttemptTime >= 0, "max attempt on error must be non-negative");
    +		this.rpcService = checkNotNull(rpcService);
    +		this.leaderID = checkNotNull(leaderID);
    +		this.targetGateway = checkNotNull(targetGateway);
    +		this.targetAddress = checkNotNull(targetAddress);
    +		this.targetName = checkNotNull(targetName);
    +		this.log = checkNotNull(log);
    +		this.heartbeatInterval = heartbeatInterval;
    +		this.heartbeatTimeout = heartbeatTimeout;
    +		this.maxHeartbeatTimeout = maxHeartbeatTimeout;
    +		this.delayOnError = delayOnError;
    +		this.maxAttemptTime = maxAttemptTime;
    +	}
    +
    +	/**
    +	 * start to schedule heartbeat
    +	 */
    +	public void start() {
    +		checkState(!closed, "The heartbeat connection is already closed");
    +		long currentHeartbeatBeginTime = System.currentTimeMillis();
    +		sendHeartbeatToTaskManager(1, heartbeatTimeout, currentHeartbeatBeginTime);
    +	}
    +
    +	/**
    +	 * Checks if the heartbeat schedule was closed.
    +	 *
    +	 * @return True if the heartbeat schedule was closed, false otherwise.
    +	 */
    +	public boolean isClosed() {
    +		return closed;
    +	}
    +
    +	/**
    +	 * stop to schedule heartbeat
    +	 */
    +	public void close() {
    +		closed = true;
    +	}
    +
    +	/**
    +	 * get the heartbeat interval
    +	 *
    +	 * @return heartbeat interval
    +	 */
    +	public long getHeartbeatInterval() {
    +		return heartbeatInterval;
    +	}
    +
    +	/**
    +	 * trigger heartbeat to target gateway
    +	 *
    +	 * @param leaderID leader session id of current sender
    +	 * @param timeout  timeout for heartbeat response
    +	 * @return HeartbeatResponsePayload wrapped in future
    +	 */
    +	protected abstract Future<Payload> triggerHeartbeat(UUID leaderID, FiniteDuration
timeout);
    +
    +	/**
    +	 * report heartbeat response payload to sender who sending heartbeat
    +	 *
    +	 * @param heartbeatResponsePayload heartbeat response which contains payload
    +	 */
    +	protected abstract void reportHeartbeatPayload(Payload heartbeatResponsePayload);
    +
    +	/**
    +	 * callback method when heartbeat sender lost heartbeat with target
    +	 */
    +	protected abstract void lossHeartbeat();
    --- End diff --
    
    typo: `lostHeartbeat`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message