Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0E579E6C for ; Thu, 16 Feb 2012 22:13:46 +0000 (UTC) Received: (qmail 3812 invoked by uid 500); 16 Feb 2012 22:13:46 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 3781 invoked by uid 500); 16 Feb 2012 22:13:46 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 3774 invoked by uid 99); 16 Feb 2012 22:13:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7A75B2388A32; Thu, 16 Feb 2012 22:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245205 [7/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples... Date: Thu, 16 Feb 2012 22:12:36 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216221244.7A75B2388A32@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Feb 16 22:12:31 2012 @@ -53,972 +53,1045 @@ import java.util.TreeMap; /** * Zookeeper-based implementation of {@link CentralizedService}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data */ @SuppressWarnings("rawtypes") -public abstract class BspService < - I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - implements Watcher, CentralizedService { - /** Private ZooKeeper instance that implements the service */ - private final ZooKeeperExt zk; - /** Has the Connection occurred? */ - private final BspEvent connectedEvent = new PredicateLock(); - /** Has worker registration changed (either healthy or unhealthy) */ - private final BspEvent workerHealthRegistrationChanged = - new PredicateLock(); - /** InputSplits are ready for consumption by workers */ - private final BspEvent inputSplitsAllReadyChanged = - new PredicateLock(); - /** InputSplit reservation or finished notification and synchronization */ - private final BspEvent inputSplitsStateChanged = - new PredicateLock(); - /** InputSplits are done being processed by workers */ - private final BspEvent inputSplitsAllDoneChanged = - new PredicateLock(); - /** InputSplit done by a worker finished notification and synchronization */ - private final BspEvent inputSplitsDoneStateChanged = - new PredicateLock(); - /** Are the partition assignments to workers ready? */ - private final BspEvent partitionAssignmentsReadyChanged = - new PredicateLock(); - - /** Application attempt changed */ - private final BspEvent applicationAttemptChanged = - new PredicateLock(); - /** Superstep finished synchronization */ - private final BspEvent superstepFinished = - new PredicateLock(); - /** Master election changed for any waited on attempt */ - private final BspEvent masterElectionChildrenChanged = - new PredicateLock(); - /** Cleaned up directory children changed*/ - private final BspEvent cleanedUpChildrenChanged = - new PredicateLock(); - /** Registered list of BspEvents */ - private final List registeredBspEvents = - new ArrayList(); - /** Configuration of the job*/ - private final Configuration conf; - /** Job context (mainly for progress) */ - private final Mapper.Context context; - /** Cached superstep (from ZooKeeper) */ - private long cachedSuperstep = UNSET_SUPERSTEP; - /** Restarted from a checkpoint (manual or automatic) */ - private long restartedSuperstep = UNSET_SUPERSTEP; - /** Cached application attempt (from ZooKeeper) */ - private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; - /** Job id, to ensure uniqueness */ - private final String jobId; - /** Task partition, to ensure uniqueness */ - private final int taskPartition; - /** My hostname */ - private final String hostname; - /** Combination of hostname '_' partition (unique id) */ - private final String hostnamePartitionId; - /** Graph partitioner */ - private final GraphPartitionerFactory graphPartitionerFactory; - /** Mapper that will do the graph computation */ - private final GraphMapper graphMapper; - /** Class logger */ - private static final Logger LOG = Logger.getLogger(BspService.class); - /** File system */ - private final FileSystem fs; - /** Checkpoint frequency */ - private int checkpointFrequency = -1; - /** Map of aggregators */ - private Map> aggregatorMap = - new TreeMap>(); - - /** Unset superstep */ - public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; - /** Input superstep (superstep when loading the vertices happens) */ - public static final long INPUT_SUPERSTEP = -1; - /** Unset application attempt */ - public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; - - private static final String BASE_DIR = "/_hadoopBsp"; - public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; - public static final String INPUT_SPLIT_DIR = "/_inputSplitDir"; - public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir"; - public static final String INPUT_SPLIT_RESERVED_NODE = - "/_inputSplitReserved"; - public static final String INPUT_SPLIT_FINISHED_NODE = - "/_inputSplitFinished"; - public static final String INPUT_SPLITS_ALL_READY_NODE = - "/_inputSplitsAllReady"; - public static final String INPUT_SPLITS_ALL_DONE_NODE = - "/_inputSplitsAllDone"; - public static final String APPLICATION_ATTEMPTS_DIR = - "/_applicationAttemptsDir"; - public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; - public static final String SUPERSTEP_DIR = "/_superstepDir"; - public static final String MERGED_AGGREGATOR_DIR = - "/_mergedAggregatorDir"; - public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; - public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; - public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; - public static final String PARTITION_ASSIGNMENTS_DIR = - "/_partitionAssignments"; - public static final String PARTITION_EXCHANGE_DIR = - "/_partitionExchangeDir"; - public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; - public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; - - public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY = - "_aggregatorValueArrayKey"; - public static final String JSONOBJ_PARTITION_STATS_KEY = - "_partitionStatsKey"; - public static final String JSONOBJ_FINISHED_VERTICES_KEY = - "_verticesFinishedKey"; - public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey"; - public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey"; - public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; - public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey"; - public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY = - "_maxVertexIndexKey"; - public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey"; - public static final String JSONOBJ_PORT_KEY = "_portKey"; - public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY = - "_checkpointFilePrefixKey"; - public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY = - "_previousHostnameKey"; - public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey"; - public static final String JSONOBJ_STATE_KEY = "_stateKey"; - public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = - "_applicationAttemptKey"; - public static final String JSONOBJ_SUPERSTEP_KEY = - "_superstepKey"; - public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey"; - public static final String AGGREGATOR_CLASS_NAME_KEY = - "_aggregatorClassNameKey"; - public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey"; - - public static final String WORKER_SUFFIX = "_worker"; - public static final String MASTER_SUFFIX = "_master"; - - /** Path to the job's root */ - public final String BASE_PATH; - /** Path to the job state determined by the master (informative only) */ - public final String MASTER_JOB_STATE_PATH; - /** Path to the input splits written by the master */ - public final String INPUT_SPLIT_PATH; - /** Path to the input splits all ready to be processed by workers */ - public final String INPUT_SPLITS_ALL_READY_PATH; - /** Path to the input splits done */ - public final String INPUT_SPLIT_DONE_PATH; - /** Path to the input splits all done to notify the workers to proceed */ - public final String INPUT_SPLITS_ALL_DONE_PATH; - /** Path to the application attempts) */ - public final String APPLICATION_ATTEMPTS_PATH; - /** Path to the cleaned up notifications */ - public final String CLEANED_UP_PATH; - /** Path to the checkpoint's root (including job id) */ - public final String CHECKPOINT_BASE_PATH; - /** Path to the master election path */ - public final String MASTER_ELECTION_PATH; - - /** - * Get the superstep from a ZooKeeper path - * - * @param path Path to parse for the superstep - */ - public static long getSuperstepFromPath(String path) { - int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); - if (foundSuperstepStart == -1) { - throw new IllegalArgumentException( - "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + - "from " + path); - } - foundSuperstepStart += SUPERSTEP_DIR.length() + 1; - int endIndex = foundSuperstepStart + - path.substring(foundSuperstepStart).indexOf("/"); - if (endIndex == -1) { - throw new IllegalArgumentException( - "getSuperstepFromPath: Cannot find end of superstep from " + - path); - } - if (LOG.isDebugEnabled()) { - LOG.debug("getSuperstepFromPath: Got path=" + path + - ", start=" + foundSuperstepStart + ", end=" + endIndex); - } - return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); - } - - /** - * Get the hostname and id from a "healthy" worker path - */ - public static String getHealthyHostnameIdFromPath(String path) { - int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); - if (foundWorkerHealthyStart == -1) { - throw new IllegalArgumentException( - "getHealthyHostnameidFromPath: Couldn't find " + - WORKER_HEALTHY_DIR + " from " + path); - } - foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); - return path.substring(foundWorkerHealthyStart); - } - - /** - * Generate the base superstep directory path for a given application - * attempt - * - * @param attempt application attempt number - * @return directory path based on the an attempt - */ - final public String getSuperstepPath(long attempt) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + SUPERSTEP_DIR; - } - - /** - * Generate the worker information "healthy" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getWorkerInfoHealthyPath(long attempt, - long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; - } - - /** - * Generate the worker information "unhealthy" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getWorkerInfoUnhealthyPath(long attempt, - long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; - } - - /** - * Generate the worker "finished" directory path for a - * superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getWorkerFinishedPath(long attempt, long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR; - } - - /** - * Generate the "partiton assignments" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getPartitionAssignmentsPath(long attempt, - long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR; - } - - /** - * Generate the "partition exchange" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getPartitionExchangePath(long attempt, - long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR; - } - - final public String getPartitionExchangeWorkerPath(long attempt, - long superstep, - WorkerInfo workerInfo) { - return getPartitionExchangePath(attempt, superstep) + - "/" + workerInfo.getHostnameId(); - } - - /** - * Generate the merged aggregator directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getMergedAggregatorPath(long attempt, long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR; - } - - /** - * Generate the "superstep finished" directory path for a superstep - * - * @param attempt application attempt number - * @param superstep superstep to use - * @return directory path based on the a superstep - */ - final public String getSuperstepFinishedPath(long attempt, long superstep) { - return APPLICATION_ATTEMPTS_PATH + "/" + attempt + - SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; - } - - /** - * Generate the base superstep directory path for a given application - * attempt - * - * @param superstep Superstep to use - * @return Directory path based on the a superstep - */ - final public String getCheckpointBasePath(long superstep) { - return CHECKPOINT_BASE_PATH + "/" + superstep; - } - - /** If at the end of a checkpoint file, indicates metadata */ - public final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; - - /** - * If at the end of a checkpoint file, indicates vertices, edges, - * messages, etc. - */ - public final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; - - /** - * If at the end of a checkpoint file, indicates metadata and data is valid - * for the same filenames without .valid - */ - public final String CHECKPOINT_VALID_POSTFIX = ".valid"; - - /** - * If at the end of a checkpoint file, indicates the stitched checkpoint - * file prefixes. A checkpoint is not valid if this file does not exist. - */ - public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; - - /** - * Get the checkpoint from a finalized checkpoint path - * - * @param finalizedPath Path of the finalized checkpoint - * @return Superstep referring to a checkpoint of the finalized path - */ - public static long getCheckpoint(Path finalizedPath) { - if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { - throw new InvalidParameterException( - "getCheckpoint: " + finalizedPath + "Doesn't end in " + - CHECKPOINT_FINALIZED_POSTFIX); - } - String checkpointString = - finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, ""); - return Long.parseLong(checkpointString); - } - - /** - * Get the ZooKeeperExt instance. - * - * @return ZooKeeperExt instance. - */ - final public ZooKeeperExt getZkExt() { - return zk; - } - - @Override - final public long getRestartedSuperstep() { - return restartedSuperstep; - } - - /** - * Set the restarted superstep - * - * @param superstep Set the manually restarted superstep - */ - final public void setRestartedSuperstep(long superstep) { - if (superstep < INPUT_SUPERSTEP) { - throw new IllegalArgumentException( - "setRestartedSuperstep: Bad argument " + superstep); - } - restartedSuperstep = superstep; - } - - /** - * Should checkpoint on this superstep? If checkpointing, always - * checkpoint the first user superstep. If restarting, the first - * checkpoint is after the frequency has been met. - * - * @param superstep Decide if checkpointing no this superstep - * @return True if this superstep should be checkpointed, false otherwise - */ - final public boolean checkpointFrequencyMet(long superstep) { - if (checkpointFrequency == 0) { - return false; - } - long firstCheckpoint = INPUT_SUPERSTEP + 1; - if (getRestartedSuperstep() != UNSET_SUPERSTEP) { - firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; - } - if (superstep < firstCheckpoint) { - return false; - } else if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) { - return true; - } else { - return false; - } - } - - /** - * Get the file system - * - * @return file system - */ - final public FileSystem getFs() { - return fs; - } - - final public Configuration getConfiguration() { - return conf; - } - - final public Mapper.Context getContext() { - return context; - } - - final public String getHostname() { - return hostname; - } - - final public String getHostnamePartitionId() { - return hostnamePartitionId; - } - - final public int getTaskPartition() { - return taskPartition; - } - - final public GraphMapper getGraphMapper() { - return graphMapper; - } - - final public BspEvent getWorkerHealthRegistrationChangedEvent() { - return workerHealthRegistrationChanged; - } - - final public BspEvent getInputSplitsAllReadyEvent() { - return inputSplitsAllReadyChanged; - } - - final public BspEvent getInputSplitsStateChangedEvent() { - return inputSplitsStateChanged; - } - - final public BspEvent getInputSplitsAllDoneEvent() { - return inputSplitsAllDoneChanged; - } - - final public BspEvent getInputSplitsDoneStateChangedEvent() { - return inputSplitsDoneStateChanged; - } - - final public BspEvent getPartitionAssignmentsReadyChangedEvent() { - return partitionAssignmentsReadyChanged; - } - - - final public BspEvent getApplicationAttemptChangedEvent() { - return applicationAttemptChanged; - } - - final public BspEvent getSuperstepFinishedEvent() { - return superstepFinished; - } - - - final public BspEvent getMasterElectionChildrenChangedEvent() { - return masterElectionChildrenChanged; - } - - final public BspEvent getCleanedUpChildrenChangedEvent() { - return cleanedUpChildrenChanged; - } - - /** - * Get the master commanded job state as a JSONObject. Also sets the - * watches to see if the master commanded job state changes. - * - * @return Last job state or null if none - */ - final public JSONObject getJobState() { - try { - getZkExt().createExt(MASTER_JOB_STATE_PATH, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.info("getJobState: Job state already exists (" + - MASTER_JOB_STATE_PATH + ")"); - } catch (Exception e) { - throw new RuntimeException(e); - } - String jobState = null; - try { - List childList = - getZkExt().getChildrenExt( - MASTER_JOB_STATE_PATH, true, true, true); - if (childList.isEmpty()) { - return null; - } - jobState = - new String(getZkExt().getData( - childList.get(childList.size() - 1), true, null)); - } catch (KeeperException.NoNodeException e) { - LOG.info("getJobState: Job state path is empty! - " + - MASTER_JOB_STATE_PATH); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - return new JSONObject(jobState); - } catch (JSONException e) { - throw new RuntimeException( - "getJobState: Failed to parse job state " + jobState); - } - } - - public BspService(String serverPortList, - int sessionMsecTimeout, - Mapper.Context context, - GraphMapper graphMapper) { - registerBspEvent(connectedEvent); - registerBspEvent(workerHealthRegistrationChanged); - registerBspEvent(inputSplitsAllReadyChanged); - registerBspEvent(inputSplitsStateChanged); - registerBspEvent(partitionAssignmentsReadyChanged); - registerBspEvent(applicationAttemptChanged); - registerBspEvent(superstepFinished); - registerBspEvent(masterElectionChildrenChanged); - registerBspEvent(cleanedUpChildrenChanged); - - this.context = context; - this.graphMapper = graphMapper; - this.conf = context.getConfiguration(); - this.jobId = conf.get("mapred.job.id", "Unknown Job"); - this.taskPartition = conf.getInt("mapred.task.partition", -1); - this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP, - UNSET_SUPERSTEP); - this.cachedSuperstep = restartedSuperstep; - if ((restartedSuperstep != UNSET_SUPERSTEP) && - (restartedSuperstep < 0)) { - throw new IllegalArgumentException( - "BspService: Invalid superstep to restart - " + - restartedSuperstep); - } - try { - this.hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - this.hostnamePartitionId = hostname + "_" + getTaskPartition(); - this.graphPartitionerFactory = - BspUtils.createGraphPartitioner(conf); - - this.checkpointFrequency = - conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY, - GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT); - - BASE_PATH = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; - MASTER_JOB_STATE_PATH = BASE_PATH + MASTER_JOB_STATE_NODE; - INPUT_SPLIT_PATH = BASE_PATH + INPUT_SPLIT_DIR; - INPUT_SPLITS_ALL_READY_PATH = BASE_PATH + INPUT_SPLITS_ALL_READY_NODE; - INPUT_SPLIT_DONE_PATH = BASE_PATH + INPUT_SPLIT_DONE_DIR; - INPUT_SPLITS_ALL_DONE_PATH = BASE_PATH + INPUT_SPLITS_ALL_DONE_NODE; - APPLICATION_ATTEMPTS_PATH = BASE_PATH + APPLICATION_ATTEMPTS_DIR; - CLEANED_UP_PATH = BASE_PATH + CLEANED_UP_DIR; - CHECKPOINT_BASE_PATH = - getConfiguration().get( - GiraphJob.CHECKPOINT_DIRECTORY, - GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId()); - MASTER_ELECTION_PATH = BASE_PATH + MASTER_ELECTION_DIR; - if (LOG.isInfoEnabled()) { - LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + - ", " + getTaskPartition() + " on " + serverPortList); - } - try { - this.zk = new ZooKeeperExt(serverPortList, sessionMsecTimeout, this); - connectedEvent.waitForever(); - this.fs = FileSystem.get(getConfiguration()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Get the job id - * - * @return job id - */ - final public String getJobId() { - return jobId; - } - - /** - * Get the latest application attempt and cache it. - * - * @return the latest application attempt - */ - final public long getApplicationAttempt() { - if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { - return cachedApplicationAttempt; - } - try { - getZkExt().createExt(APPLICATION_ATTEMPTS_PATH, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - LOG.info("getApplicationAttempt: Node " + - APPLICATION_ATTEMPTS_PATH + " already exists!"); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - List attemptList = - getZkExt().getChildrenExt( - APPLICATION_ATTEMPTS_PATH, true, false, false); - if (attemptList.isEmpty()) { - cachedApplicationAttempt = 0; - } - else { - cachedApplicationAttempt = - Long.parseLong(Collections.max(attemptList)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - - return cachedApplicationAttempt; - } - - /** - * Get the latest superstep and cache it. - * - * @return the latest superstep - * @throws InterruptedException - * @throws KeeperException - */ - final public long getSuperstep() { - if (cachedSuperstep != UNSET_SUPERSTEP) { - return cachedSuperstep; - } - String superstepPath = getSuperstepPath(getApplicationAttempt()); - try { - getZkExt().createExt(superstepPath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - if (LOG.isInfoEnabled()) { - LOG.info("getApplicationAttempt: Node " + - APPLICATION_ATTEMPTS_PATH + " already exists!"); - } - } catch (KeeperException e) { - throw new IllegalStateException( - "getSuperstep: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getSuperstep: InterruptedException", e); - } - - List superstepList; - try { - superstepList = - getZkExt().getChildrenExt(superstepPath, true, false, false); - } catch (KeeperException e) { - throw new IllegalStateException( - "getSuperstep: KeeperException", e); - } catch (InterruptedException e) { - throw new IllegalStateException( - "getSuperstep: InterruptedException", e); - } - if (superstepList.isEmpty()) { - cachedSuperstep = INPUT_SUPERSTEP; - } - else { - cachedSuperstep = - Long.parseLong(Collections.max(superstepList)); - } - - return cachedSuperstep; - } - - /** - * Increment the cached superstep. Shouldn't be the initial value anymore. - */ - final public void incrCachedSuperstep() { - if (cachedSuperstep == UNSET_SUPERSTEP) { - throw new IllegalStateException( - "incrSuperstep: Invalid unset cached superstep " + - UNSET_SUPERSTEP); - } - ++cachedSuperstep; - } - - /** - * Set the cached superstep (should only be used for loading checkpoints - * or recovering from failure). - * - * @param superstep will be used as the next superstep iteration - */ - final public void setCachedSuperstep(long superstep) { - cachedSuperstep = superstep; - } - - /** - * Set the cached application attempt (should only be used for restart from - * failure by the master) - * - * @param applicationAttempt Will denote the new application attempt - */ - final public void setApplicationAttempt(long applicationAttempt) { - cachedApplicationAttempt = applicationAttempt; - String superstepPath = getSuperstepPath(cachedApplicationAttempt); - try { - getZkExt().createExt(superstepPath, - null, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - } catch (KeeperException.NodeExistsException e) { - throw new IllegalArgumentException( - "setApplicationAttempt: Attempt already exists! - " + - superstepPath, e); - } catch (KeeperException e) { - throw new RuntimeException( - "setApplicationAttempt: KeeperException - " + - superstepPath, e); - } catch (InterruptedException e) { - throw new RuntimeException( - "setApplicationAttempt: InterruptedException - " + - superstepPath, e); - } - } - - /** - * Register an aggregator with name. - * - * @param name Name of the aggregator - * @param aggregatorClass Class of the aggregator - * @return Aggregator - * @throws IllegalAccessException - * @throws InstantiationException - */ - public final Aggregator registerAggregator( - String name, - Class> aggregatorClass) - throws InstantiationException, IllegalAccessException { - if (aggregatorMap.get(name) != null) { - return null; - } - Aggregator aggregator = - (Aggregator) aggregatorClass.newInstance(); - @SuppressWarnings("unchecked") - Aggregator writableAggregator = - (Aggregator) aggregator; - aggregatorMap.put(name, writableAggregator); +public abstract class BspService + implements Watcher, CentralizedService { + /** Unset superstep */ + public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; + /** Input superstep (superstep when loading the vertices happens) */ + public static final long INPUT_SUPERSTEP = -1; + /** Unset application attempt */ + public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; + /** Base ZooKeeper directory */ + public static final String BASE_DIR = "/_hadoopBsp"; + /** Master job state znode above base dir */ + public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; + /** Input split directory about base dir */ + public static final String INPUT_SPLIT_DIR = "/_inputSplitDir"; + /** Input split done directory about base dir */ + public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir"; + /** Denotes a reserved input split */ + public static final String INPUT_SPLIT_RESERVED_NODE = + "/_inputSplitReserved"; + /** Denotes a finished input split */ + public static final String INPUT_SPLIT_FINISHED_NODE = + "/_inputSplitFinished"; + /** Denotes that all the input splits are are ready for consumption */ + public static final String INPUT_SPLITS_ALL_READY_NODE = + "/_inputSplitsAllReady"; + /** Denotes that all the input splits are done. */ + public static final String INPUT_SPLITS_ALL_DONE_NODE = + "/_inputSplitsAllDone"; + /** Directory of attempts of this application */ + public static final String APPLICATION_ATTEMPTS_DIR = + "/_applicationAttemptsDir"; + /** Where the master election happens */ + public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; + /** Superstep scope */ + public static final String SUPERSTEP_DIR = "/_superstepDir"; + /** Where the merged aggregators are located */ + public static final String MERGED_AGGREGATOR_DIR = + "/_mergedAggregatorDir"; + /** Healthy workers register here. */ + public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; + /** Unhealthy workers register here. */ + public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; + /** Finished workers notify here */ + public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; + /** Where the partition assignments are set */ + public static final String PARTITION_ASSIGNMENTS_DIR = + "/_partitionAssignments"; + /** Helps coordinate the partition exchnages */ + public static final String PARTITION_EXCHANGE_DIR = + "/_partitionExchangeDir"; + /** Denotes that the superstep is done */ + public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; + /** Denotes which workers have been cleaned up */ + public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; + /** JSON aggregator value array key */ + public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY = + "_aggregatorValueArrayKey"; + /** JSON partition stats key */ + public static final String JSONOBJ_PARTITION_STATS_KEY = + "_partitionStatsKey"; + /** JSON finished vertices key */ + public static final String JSONOBJ_FINISHED_VERTICES_KEY = + "_verticesFinishedKey"; + /** JSON vertex count key */ + public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey"; + /** JSON edge count key */ + public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey"; + /** JSON message count key */ + public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; + /** JSON hostname id key */ + public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey"; + /** JSON max vertex index key */ + public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY = + "_maxVertexIndexKey"; + /** JSON hostname key */ + public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey"; + /** JSON port key */ + public static final String JSONOBJ_PORT_KEY = "_portKey"; + /** JSON checkpoint file prefix key */ + public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY = + "_checkpointFilePrefixKey"; + /** JSON previous hostname key */ + public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY = + "_previousHostnameKey"; + /** JSON previous port key */ + public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey"; + /** JSON state key */ + public static final String JSONOBJ_STATE_KEY = "_stateKey"; + /** JSON application attempt key */ + public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = + "_applicationAttemptKey"; + /** JSON superstep key */ + public static final String JSONOBJ_SUPERSTEP_KEY = + "_superstepKey"; + /** Aggregator name key */ + public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey"; + /** Aggregator class name key */ + public static final String AGGREGATOR_CLASS_NAME_KEY = + "_aggregatorClassNameKey"; + /** Aggregator value key */ + public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey"; + /** Suffix denotes a worker */ + public static final String WORKER_SUFFIX = "_worker"; + /** Suffix denotes a master */ + public static final String MASTER_SUFFIX = "_master"; + /** If at the end of a checkpoint file, indicates metadata */ + public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; + /** + * If at the end of a checkpoint file, indicates vertices, edges, + * messages, etc. + */ + public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; + /** + * If at the end of a checkpoint file, indicates metadata and data is valid + * for the same filenames without .valid + */ + public static final String CHECKPOINT_VALID_POSTFIX = ".valid"; + /** + * If at the end of a checkpoint file, indicates the stitched checkpoint + * file prefixes. A checkpoint is not valid if this file does not exist. + */ + public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; + /** Class logger */ + private static final Logger LOG = Logger.getLogger(BspService.class); + /** Path to the job's root */ + protected final String basePath; + /** Path to the job state determined by the master (informative only) */ + protected final String masterJobStatePath; + /** Path to the input splits written by the master */ + protected final String inputSplitsPath; + /** Path to the input splits all ready to be processed by workers */ + protected final String inputSplitsAllReadyPath; + /** Path to the input splits done */ + protected final String inputSplitsDonePath; + /** Path to the input splits all done to notify the workers to proceed */ + protected final String inputSplitsAllDonePath; + /** Path to the application attempts) */ + protected final String applicationAttemptsPath; + /** Path to the cleaned up notifications */ + protected final String cleanedUpPath; + /** Path to the checkpoint's root (including job id) */ + protected final String checkpointBasePath; + /** Path to the master election path */ + protected final String masterElectionPath; + /** Private ZooKeeper instance that implements the service */ + private final ZooKeeperExt zk; + /** Has the Connection occurred? */ + private final BspEvent connectedEvent = new PredicateLock(); + /** Has worker registration changed (either healthy or unhealthy) */ + private final BspEvent workerHealthRegistrationChanged = + new PredicateLock(); + /** InputSplits are ready for consumption by workers */ + private final BspEvent inputSplitsAllReadyChanged = + new PredicateLock(); + /** InputSplit reservation or finished notification and synchronization */ + private final BspEvent inputSplitsStateChanged = + new PredicateLock(); + /** InputSplits are done being processed by workers */ + private final BspEvent inputSplitsAllDoneChanged = + new PredicateLock(); + /** InputSplit done by a worker finished notification and synchronization */ + private final BspEvent inputSplitsDoneStateChanged = + new PredicateLock(); + /** Are the partition assignments to workers ready? */ + private final BspEvent partitionAssignmentsReadyChanged = + new PredicateLock(); + /** Application attempt changed */ + private final BspEvent applicationAttemptChanged = + new PredicateLock(); + /** Superstep finished synchronization */ + private final BspEvent superstepFinished = + new PredicateLock(); + /** Master election changed for any waited on attempt */ + private final BspEvent masterElectionChildrenChanged = + new PredicateLock(); + /** Cleaned up directory children changed*/ + private final BspEvent cleanedUpChildrenChanged = + new PredicateLock(); + /** Registered list of BspEvents */ + private final List registeredBspEvents = + new ArrayList(); + /** Configuration of the job*/ + private final Configuration conf; + /** Job context (mainly for progress) */ + private final Mapper.Context context; + /** Cached superstep (from ZooKeeper) */ + private long cachedSuperstep = UNSET_SUPERSTEP; + /** Restarted from a checkpoint (manual or automatic) */ + private long restartedSuperstep = UNSET_SUPERSTEP; + /** Cached application attempt (from ZooKeeper) */ + private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; + /** Job id, to ensure uniqueness */ + private final String jobId; + /** Task partition, to ensure uniqueness */ + private final int taskPartition; + /** My hostname */ + private final String hostname; + /** Combination of hostname '_' partition (unique id) */ + private final String hostnamePartitionId; + /** Graph partitioner */ + private final GraphPartitionerFactory graphPartitionerFactory; + /** Mapper that will do the graph computation */ + private final GraphMapper graphMapper; + /** File system */ + private final FileSystem fs; + /** Checkpoint frequency */ + private int checkpointFrequency = -1; + /** Map of aggregators */ + private Map> aggregatorMap = + new TreeMap>(); + + /** + * Constructor. + * + * @param serverPortList ZooKeeper server port list + * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds + * @param context Mapper context + * @param graphMapper Graph mapper reference + */ + public BspService(String serverPortList, + int sessionMsecTimeout, + Mapper.Context context, + GraphMapper graphMapper) { + registerBspEvent(connectedEvent); + registerBspEvent(workerHealthRegistrationChanged); + registerBspEvent(inputSplitsAllReadyChanged); + registerBspEvent(inputSplitsStateChanged); + registerBspEvent(partitionAssignmentsReadyChanged); + registerBspEvent(applicationAttemptChanged); + registerBspEvent(superstepFinished); + registerBspEvent(masterElectionChildrenChanged); + registerBspEvent(cleanedUpChildrenChanged); + + this.context = context; + this.graphMapper = graphMapper; + this.conf = context.getConfiguration(); + this.jobId = conf.get("mapred.job.id", "Unknown Job"); + this.taskPartition = conf.getInt("mapred.task.partition", -1); + this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP, + UNSET_SUPERSTEP); + this.cachedSuperstep = restartedSuperstep; + if ((restartedSuperstep != UNSET_SUPERSTEP) && + (restartedSuperstep < 0)) { + throw new IllegalArgumentException( + "BspService: Invalid superstep to restart - " + + restartedSuperstep); + } + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + this.hostnamePartitionId = hostname + "_" + getTaskPartition(); + this.graphPartitionerFactory = + BspUtils.createGraphPartitioner(conf); + + this.checkpointFrequency = + conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY, + GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT); + + basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; + masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; + inputSplitsPath = basePath + INPUT_SPLIT_DIR; + inputSplitsAllReadyPath = basePath + INPUT_SPLITS_ALL_READY_NODE; + inputSplitsDonePath = basePath + INPUT_SPLIT_DONE_DIR; + inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE; + applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; + cleanedUpPath = basePath + CLEANED_UP_DIR; + checkpointBasePath = + getConfiguration().get( + GiraphJob.CHECKPOINT_DIRECTORY, + GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId()); + masterElectionPath = basePath + MASTER_ELECTION_DIR; + if (LOG.isInfoEnabled()) { + LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + + ", " + getTaskPartition() + " on " + serverPortList); + } + try { + this.zk = new ZooKeeperExt(serverPortList, sessionMsecTimeout, this); + connectedEvent.waitForever(); + this.fs = FileSystem.get(getConfiguration()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + /** + * Get the superstep from a ZooKeeper path + * + * @param path Path to parse for the superstep + * @return Superstep from the path. + */ + public static long getSuperstepFromPath(String path) { + int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); + if (foundSuperstepStart == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + + "from " + path); + } + foundSuperstepStart += SUPERSTEP_DIR.length() + 1; + int endIndex = foundSuperstepStart + + path.substring(foundSuperstepStart).indexOf("/"); + if (endIndex == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find end of superstep from " + + path); + } + if (LOG.isDebugEnabled()) { + LOG.debug("getSuperstepFromPath: Got path=" + path + + ", start=" + foundSuperstepStart + ", end=" + endIndex); + } + return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); + } + + /** + * Get the hostname and id from a "healthy" worker path + * + * @param path Path to check + * @return Hostname and id from path + */ + public static String getHealthyHostnameIdFromPath(String path) { + int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); + if (foundWorkerHealthyStart == -1) { + throw new IllegalArgumentException( + "getHealthyHostnameidFromPath: Couldn't find " + + WORKER_HEALTHY_DIR + " from " + path); + } + foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); + return path.substring(foundWorkerHealthyStart); + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param attempt application attempt number + * @return directory path based on the an attempt + */ + public final String getSuperstepPath(long attempt) { + return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR; + } + + /** + * Generate the worker information "healthy" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerInfoHealthyPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; + } + + /** + * Generate the worker information "unhealthy" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerInfoUnhealthyPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; + } + + /** + * Generate the worker "finished" directory path for a + * superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getWorkerFinishedPath(long attempt, long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR; + } + + /** + * Generate the "partiton assignments" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getPartitionAssignmentsPath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR; + } + + /** + * Generate the "partition exchange" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getPartitionExchangePath(long attempt, + long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR; + } + + /** + * Based on the superstep, worker info, and attempt, get the appropriate + * worker path for the exchange. + * + * @param attempt Application attempt + * @param superstep Superstep + * @param workerInfo Worker info of the exchange. + * @return Path of the desired worker + */ + public final String getPartitionExchangeWorkerPath(long attempt, + long superstep, + WorkerInfo workerInfo) { + return getPartitionExchangePath(attempt, superstep) + + "/" + workerInfo.getHostnameId(); + } + + /** + * Generate the merged aggregator directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getMergedAggregatorPath(long attempt, long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR; + } + + /** + * Generate the "superstep finished" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + public final String getSuperstepFinishedPath(long attempt, long superstep) { + return applicationAttemptsPath + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param superstep Superstep to use + * @return Directory path based on the a superstep + */ + public final String getCheckpointBasePath(long superstep) { + return checkpointBasePath + "/" + superstep; + } + + /** + * Get the checkpoint from a finalized checkpoint path + * + * @param finalizedPath Path of the finalized checkpoint + * @return Superstep referring to a checkpoint of the finalized path + */ + public static long getCheckpoint(Path finalizedPath) { + if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { + throw new InvalidParameterException( + "getCheckpoint: " + finalizedPath + "Doesn't end in " + + CHECKPOINT_FINALIZED_POSTFIX); + } + String checkpointString = + finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, ""); + return Long.parseLong(checkpointString); + } + + /** + * Get the ZooKeeperExt instance. + * + * @return ZooKeeperExt instance. + */ + public final ZooKeeperExt getZkExt() { + return zk; + } + + @Override + public final long getRestartedSuperstep() { + return restartedSuperstep; + } + + /** + * Set the restarted superstep + * + * @param superstep Set the manually restarted superstep + */ + public final void setRestartedSuperstep(long superstep) { + if (superstep < INPUT_SUPERSTEP) { + throw new IllegalArgumentException( + "setRestartedSuperstep: Bad argument " + superstep); + } + restartedSuperstep = superstep; + } + + /** + * Should checkpoint on this superstep? If checkpointing, always + * checkpoint the first user superstep. If restarting, the first + * checkpoint is after the frequency has been met. + * + * @param superstep Decide if checkpointing no this superstep + * @return True if this superstep should be checkpointed, false otherwise + */ + public final boolean checkpointFrequencyMet(long superstep) { + if (checkpointFrequency == 0) { + return false; + } + long firstCheckpoint = INPUT_SUPERSTEP + 1; + if (getRestartedSuperstep() != UNSET_SUPERSTEP) { + firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; + } + if (superstep < firstCheckpoint) { + return false; + } else if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) { + return true; + } + return false; + } + + /** + * Get the file system + * + * @return file system + */ + public final FileSystem getFs() { + return fs; + } + + public final Configuration getConfiguration() { + return conf; + } + + public final Mapper.Context getContext() { + return context; + } + + public final String getHostname() { + return hostname; + } + + public final String getHostnamePartitionId() { + return hostnamePartitionId; + } + + public final int getTaskPartition() { + return taskPartition; + } + + public final GraphMapper getGraphMapper() { + return graphMapper; + } + + public final BspEvent getWorkerHealthRegistrationChangedEvent() { + return workerHealthRegistrationChanged; + } + + public final BspEvent getInputSplitsAllReadyEvent() { + return inputSplitsAllReadyChanged; + } + + public final BspEvent getInputSplitsStateChangedEvent() { + return inputSplitsStateChanged; + } + + public final BspEvent getInputSplitsAllDoneEvent() { + return inputSplitsAllDoneChanged; + } + + public final BspEvent getInputSplitsDoneStateChangedEvent() { + return inputSplitsDoneStateChanged; + } + + public final BspEvent getPartitionAssignmentsReadyChangedEvent() { + return partitionAssignmentsReadyChanged; + } + + + public final BspEvent getApplicationAttemptChangedEvent() { + return applicationAttemptChanged; + } + + public final BspEvent getSuperstepFinishedEvent() { + return superstepFinished; + } + + + public final BspEvent getMasterElectionChildrenChangedEvent() { + return masterElectionChildrenChanged; + } + + public final BspEvent getCleanedUpChildrenChangedEvent() { + return cleanedUpChildrenChanged; + } + + /** + * Get the master commanded job state as a JSONObject. Also sets the + * watches to see if the master commanded job state changes. + * + * @return Last job state or null if none + * @throws InterruptedException + * @throws KeeperException + */ + public final JSONObject getJobState() { + try { + getZkExt().createExt(masterJobStatePath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getJobState: Job state already exists (" + + masterJobStatePath + ")"); + } catch (KeeperException e) { + throw new IllegalStateException("Failed to create job state path " + + "due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to create job state path " + + "due to InterruptedException", e); + } + String jobState = null; + try { + List childList = + getZkExt().getChildrenExt( + masterJobStatePath, true, true, true); + if (childList.isEmpty()) { + return null; + } + jobState = + new String(getZkExt().getData( + childList.get(childList.size() - 1), true, null)); + } catch (KeeperException.NoNodeException e) { + LOG.info("getJobState: Job state path is empty! - " + + masterJobStatePath); + } catch (KeeperException e) { + throw new IllegalStateException("Failed to get job state path " + + "children due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to get job state path " + + "children due to InterruptedException", e); + } + try { + return new JSONObject(jobState); + } catch (JSONException e) { + throw new RuntimeException( + "getJobState: Failed to parse job state " + jobState); + } + } + + /** + * Get the job id + * + * @return job id + */ + public final String getJobId() { + return jobId; + } + + /** + * Get the latest application attempt and cache it. + * + * @return the latest application attempt + */ + public final long getApplicationAttempt() { + if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { + return cachedApplicationAttempt; + } + try { + getZkExt().createExt(applicationAttemptsPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getApplicationAttempt: Node " + + applicationAttemptsPath + " already exists!"); + } catch (KeeperException e) { + throw new IllegalStateException("Couldn't create application " + + "attempts path due to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Couldn't create application " + + "attempts path due to InterruptedException", e); + } + try { + List attemptList = + getZkExt().getChildrenExt( + applicationAttemptsPath, true, false, false); + if (attemptList.isEmpty()) { + cachedApplicationAttempt = 0; + } else { + cachedApplicationAttempt = + Long.parseLong(Collections.max(attemptList)); + } + } catch (KeeperException e) { + throw new IllegalStateException("Couldn't get application " + + "attempts to KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Couldn't get application " + + "attempts to InterruptedException", e); + } + + return cachedApplicationAttempt; + } + + /** + * Get the latest superstep and cache it. + * + * @return the latest superstep + * @throws InterruptedException + * @throws KeeperException + */ + public final long getSuperstep() { + if (cachedSuperstep != UNSET_SUPERSTEP) { + return cachedSuperstep; + } + String superstepPath = getSuperstepPath(getApplicationAttempt()); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("getApplicationAttempt: Node " + + applicationAttemptsPath + " already exists!"); + } + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + + List superstepList; + try { + superstepList = + getZkExt().getChildrenExt(superstepPath, true, false, false); + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + if (superstepList.isEmpty()) { + cachedSuperstep = INPUT_SUPERSTEP; + } else { + cachedSuperstep = + Long.parseLong(Collections.max(superstepList)); + } + + return cachedSuperstep; + } + + /** + * Increment the cached superstep. Shouldn't be the initial value anymore. + */ + public final void incrCachedSuperstep() { + if (cachedSuperstep == UNSET_SUPERSTEP) { + throw new IllegalStateException( + "incrSuperstep: Invalid unset cached superstep " + + UNSET_SUPERSTEP); + } + ++cachedSuperstep; + } + + /** + * Set the cached superstep (should only be used for loading checkpoints + * or recovering from failure). + * + * @param superstep will be used as the next superstep iteration + */ + public final void setCachedSuperstep(long superstep) { + cachedSuperstep = superstep; + } + + /** + * Set the cached application attempt (should only be used for restart from + * failure by the master) + * + * @param applicationAttempt Will denote the new application attempt + */ + public final void setApplicationAttempt(long applicationAttempt) { + cachedApplicationAttempt = applicationAttempt; + String superstepPath = getSuperstepPath(cachedApplicationAttempt); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalArgumentException( + "setApplicationAttempt: Attempt already exists! - " + + superstepPath, e); + } catch (KeeperException e) { + throw new RuntimeException( + "setApplicationAttempt: KeeperException - " + + superstepPath, e); + } catch (InterruptedException e) { + throw new RuntimeException( + "setApplicationAttempt: InterruptedException - " + + superstepPath, e); + } + } + + /** + * Register an aggregator with name. + * + * @param Aggregator type + * @param name Name of the aggregator + * @param aggregatorClass Class of the aggregator + * @return Aggregator + * @throws IllegalAccessException + * @throws InstantiationException + */ + public final Aggregator registerAggregator( + String name, + Class> aggregatorClass) + throws InstantiationException, IllegalAccessException { + if (aggregatorMap.get(name) != null) { + return null; + } + Aggregator aggregator = + (Aggregator) aggregatorClass.newInstance(); + @SuppressWarnings("unchecked") + Aggregator writableAggregator = + (Aggregator) aggregator; + aggregatorMap.put(name, writableAggregator); + if (LOG.isInfoEnabled()) { + LOG.info("registerAggregator: registered " + name); + } + return aggregator; + } + + /** + * Get aggregator by name. + * + * @param name Name of aggregator + * @return Aggregator or null when not registered + */ + public final Aggregator getAggregator(String name) { + return aggregatorMap.get(name); + } + + /** + * Get the aggregator map. + * + * @return Map of aggregator names to aggregator + */ + public Map> getAggregatorMap() { + return aggregatorMap; + } + + /** + * Register a BspEvent. Ensure that it will be signaled + * by catastrophic failure so that threads waiting on an event signal + * will be unblocked. + * + * @param event Event to be registered. + */ + public void registerBspEvent(BspEvent event) { + registeredBspEvents.add(event); + } + + /** + * Subclasses can use this to instantiate their respective partitioners + * + * @return Instantiated graph partitioner factory + */ + protected GraphPartitionerFactory getGraphPartitionerFactory() { + return graphPartitionerFactory; + } + + /** + * Derived classes that want additional ZooKeeper events to take action + * should override this. + * + * @param event Event that occurred + * @return true if the event was processed here, false otherwise + */ + protected boolean processEvent(WatchedEvent event) { + return false; + } + + @Override + public final void process(WatchedEvent event) { + // 1. Process all shared events + // 2. Process specific derived class events + if (LOG.isDebugEnabled()) { + LOG.debug("process: Got a new event, path = " + event.getPath() + + ", type = " + event.getType() + ", state = " + + event.getState()); + } + + if ((event.getPath() == null) && (event.getType() == EventType.None)) { + if (event.getState() == KeeperState.Disconnected) { + // No way to recover from a disconnect event, signal all BspEvents + for (BspEvent bspEvent : registeredBspEvents) { + bspEvent.signal(); + } + throw new RuntimeException( + "process: Disconnected from ZooKeeper, cannot recover - " + + event); + } else if (event.getState() == KeeperState.SyncConnected) { if (LOG.isInfoEnabled()) { - LOG.info("registerAggregator: registered " + name); - } - return aggregator; - } - - /** - * Get aggregator by name. - * - * @param name - * @return Aggregator (null when not registered) - */ - public final Aggregator getAggregator(String name) { - return aggregatorMap.get(name); - } - - /** - * Get the aggregator map. - */ - public Map> getAggregatorMap() { - return aggregatorMap; - } - - /** - * Register a BspEvent. Ensure that it will be signaled - * by catastrophic failure so that threads waiting on an event signal - * will be unblocked. - */ - public void registerBspEvent(BspEvent event) { - registeredBspEvents.add(event); - } - - /** - * Subclasses can use this to instantiate their respective partitioners - * - * @return Instantiated graph partitioner factory - */ - protected GraphPartitionerFactory getGraphPartitionerFactory() { - return graphPartitionerFactory; - } - - /** - * Derived classes that want additional ZooKeeper events to take action - * should override this. - * - * @param event Event that occurred - * @return true if the event was processed here, false otherwise - */ - protected boolean processEvent(WatchedEvent event) { - return false; - } - - @Override - final public void process(WatchedEvent event) { - // 1. Process all shared events - // 2. Process specific derived class events - - if (LOG.isDebugEnabled()) { - LOG.debug("process: Got a new event, path = " + event.getPath() + - ", type = " + event.getType() + ", state = " + - event.getState()); - } - - if ((event.getPath() == null) && (event.getType() == EventType.None)) { - if (event.getState() == KeeperState.Disconnected) { - // No way to recover from a disconnect event, signal all BspEvents - for (BspEvent bspEvent : registeredBspEvents) { - bspEvent.signal(); - } - throw new RuntimeException( - "process: Disconnected from ZooKeeper, cannot recover - " + - event); - } else if (event.getState() == KeeperState.SyncConnected) { - if (LOG.isInfoEnabled()) { - LOG.info("process: Asynchronous connection complete."); - } - connectedEvent.signal(); - } else { - LOG.warn("process: Got unknown null path event " + event); - } - return; - } - - boolean eventProcessed = false; - if (event.getPath().startsWith(MASTER_JOB_STATE_PATH)) { - // This will cause all becomeMaster() MasterThreads to notice the - // change in job state and quit trying to become the master. - masterElectionChildrenChanged.signal(); - eventProcessed = true; - } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || - event.getPath().contains(WORKER_UNHEALTHY_DIR)) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: workerHealthRegistrationChanged " + - "(worker health reported - healthy/unhealthy )"); - } - workerHealthRegistrationChanged.signal(); - eventProcessed = true; - } else if (event.getPath().equals(INPUT_SPLITS_ALL_READY_PATH) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsReadyChanged " + - "(input splits ready)"); - } - inputSplitsAllReadyChanged.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: inputSplitsStateChanged "+ - "(made a reservation)"); - } - inputSplitsStateChanged.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) && - (event.getType() == EventType.NodeDeleted)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsStateChanged "+ - "(lost a reservation)"); - } - inputSplitsStateChanged.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: inputSplitsStateChanged " + - "(finished inputsplit)"); - } - inputSplitsStateChanged.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) && - (event.getType() == EventType.NodeChildrenChanged)) { - if (LOG.isDebugEnabled()) { - LOG.debug("process: inputSplitsDoneStateChanged " + - "(worker finished sending)"); - } - inputSplitsDoneStateChanged.signal(); - eventProcessed = true; - } else if (event.getPath().equals(INPUT_SPLITS_ALL_DONE_PATH) && - (event.getType() == EventType.NodeCreated)) { - if (LOG.isInfoEnabled()) { - LOG.info("process: inputSplitsAllDoneChanged " + - "(all vertices sent from input splits)"); - } - inputSplitsAllDoneChanged.signal(); - eventProcessed = true; - } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) && - event.getType() == EventType.NodeCreated) { - if (LOG.isInfoEnabled()) { - LOG.info("process: partitionAssignmentsReadyChanged " + - "(partitions are assigned)"); - } - partitionAssignmentsReadyChanged.signal(); - eventProcessed = true; - } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && - event.getType() == EventType.NodeCreated) { - if (LOG.isInfoEnabled()) { - LOG.info("process: superstepFinished signaled"); - } - superstepFinished.signal(); - eventProcessed = true; - } else if (event.getPath().endsWith(APPLICATION_ATTEMPTS_PATH) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: applicationAttemptChanged signaled"); - } - applicationAttemptChanged.signal(); - eventProcessed = true; - } else if (event.getPath().contains(MASTER_ELECTION_DIR) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: masterElectionChildrenChanged signaled"); - } - masterElectionChildrenChanged.signal(); - eventProcessed = true; - } else if (event.getPath().equals(CLEANED_UP_PATH) && - event.getType() == EventType.NodeChildrenChanged) { - if (LOG.isInfoEnabled()) { - LOG.info("process: cleanedUpChildrenChanged signaled"); - } - cleanedUpChildrenChanged.signal(); - eventProcessed = true; - } - - if ((processEvent(event) == false) && (eventProcessed == false)) { - LOG.warn("process: Unknown and unprocessed event (path=" + - event.getPath() + ", type=" + event.getType() + - ", state=" + event.getState() + ")"); + LOG.info("process: Asynchronous connection complete."); } + connectedEvent.signal(); + } else { + LOG.warn("process: Got unknown null path event " + event); + } + return; + } + + boolean eventProcessed = false; + if (event.getPath().startsWith(masterJobStatePath)) { + // This will cause all becomeMaster() MasterThreads to notice the + // change in job state and quit trying to become the master. + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || + event.getPath().contains(WORKER_UNHEALTHY_DIR)) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: workerHealthRegistrationChanged " + + "(worker health reported - healthy/unhealthy )"); + } + workerHealthRegistrationChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(inputSplitsAllReadyPath) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsReadyChanged " + + "(input splits ready)"); + } + inputSplitsAllReadyChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: inputSplitsStateChanged " + + "(made a reservation)"); + } + inputSplitsStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeDeleted)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsStateChanged " + + "(lost a reservation)"); + } + inputSplitsStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: inputSplitsStateChanged " + + "(finished inputsplit)"); + } + inputSplitsStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: inputSplitsDoneStateChanged " + + "(worker finished sending)"); + } + inputSplitsDoneStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(inputSplitsAllDonePath) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsAllDoneChanged " + + "(all vertices sent from input splits)"); + } + inputSplitsAllDoneChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: partitionAssignmentsReadyChanged " + + "(partitions are assigned)"); + } + partitionAssignmentsReadyChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: superstepFinished signaled"); + } + superstepFinished.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(applicationAttemptsPath) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: applicationAttemptChanged signaled"); + } + applicationAttemptChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(MASTER_ELECTION_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: masterElectionChildrenChanged signaled"); + } + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(cleanedUpPath) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: cleanedUpChildrenChanged signaled"); + } + cleanedUpChildrenChanged.signal(); + eventProcessed = true; + } + + if (!(processEvent(event)) && (!eventProcessed)) { + LOG.warn("process: Unknown and unprocessed event (path=" + + event.getPath() + ", type=" + event.getType() + + ", state=" + event.getState() + ")"); } + } }