Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0AFA3E841 for ; Fri, 4 Jan 2013 20:52:41 +0000 (UTC) Received: (qmail 58862 invoked by uid 500); 4 Jan 2013 20:52:40 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 58781 invoked by uid 500); 4 Jan 2013 20:52:40 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 58194 invoked by uid 99); 4 Jan 2013 20:52:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jan 2013 20:52:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8B0EF823163; Fri, 4 Jan 2013 20:52:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nitay@apache.org To: commits@giraph.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [9/23] GIRAPH-409: Refactor / cleanups (nitay) Message-Id: <20130104205239.8B0EF823163@tyr.zones.apache.org> Date: Fri, 4 Jan 2013 20:52:39 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java new file mode 100644 index 0000000..4483385 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -0,0 +1,1895 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import org.apache.commons.io.FilenameUtils; +import org.apache.giraph.bsp.ApplicationState; +import org.apache.giraph.bsp.BspInputFormat; +import org.apache.giraph.bsp.CentralizedServiceMaster; +import org.apache.giraph.bsp.SuperstepState; +import org.apache.giraph.comm.MasterClient; +import org.apache.giraph.comm.MasterServer; +import org.apache.giraph.comm.netty.NettyMasterClient; +import org.apache.giraph.comm.netty.NettyMasterServer; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.counters.GiraphStats; +import org.apache.giraph.graph.AddressesAndPartitionsWritable; +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.graph.MapFunctions; +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.GiraphInputFormat; +import org.apache.giraph.graph.GlobalStats; +import org.apache.giraph.graph.GraphMapper; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.InputSplitEvents; +import org.apache.giraph.graph.InputSplitPaths; +import org.apache.giraph.io.VertexInputFormat; +import org.apache.giraph.partition.MasterGraphPartitioner; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.PartitionStats; +import org.apache.giraph.partition.PartitionUtils; +import org.apache.giraph.metrics.AggregatedMetrics; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.GiraphTimer; +import org.apache.giraph.metrics.GiraphTimerContext; +import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; +import org.apache.giraph.metrics.WorkerSuperstepMetrics; +import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.time.SystemTime; +import org.apache.giraph.time.Time; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.giraph.zk.BspEvent; +import org.apache.giraph.zk.PredicateLock; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.ZooDefs.Ids; +import org.json.JSONException; +import org.json.JSONObject; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import net.iharder.Base64; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * ZooKeeper-based implementation of {@link CentralizedServiceMaster}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +@SuppressWarnings("rawtypes") +public class BspServiceMaster + extends BspService + implements CentralizedServiceMaster, + ResetSuperstepMetricsObserver { + /** Print worker names only if there are 10 workers left */ + public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10; + /** How many threads to use when writing input splits to zookeeper*/ + public static final String INPUT_SPLIT_THREAD_COUNT = + "giraph.inputSplitThreadCount"; + /** Default number of threads to use when writing input splits to zookeeper */ + public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1; + /** Time instance to use for timing */ + private static final Time TIME = SystemTime.get(); + /** Class logger */ + private static final Logger LOG = Logger.getLogger(BspServiceMaster.class); + /** Am I the master? */ + private boolean isMaster = false; + /** Max number of workers */ + private final int maxWorkers; + /** Min number of workers */ + private final int minWorkers; + /** Min % responded workers */ + private final float minPercentResponded; + /** Msecs to wait for an event */ + private final int eventWaitMsecs; + /** Max msecs to wait for a superstep to get enough workers */ + private final int maxSuperstepWaitMsecs; + /** Min number of long tails before printing */ + private final int partitionLongTailMinPrint; + /** Last finalized checkpoint */ + private long lastCheckpointedSuperstep = -1; + /** Worker wrote checkpoint */ + private final BspEvent workerWroteCheckpoint; + /** State of the superstep changed */ + private final BspEvent superstepStateChanged; + /** Master graph partitioner */ + private final MasterGraphPartitioner masterGraphPartitioner; + /** All the partition stats from the last superstep */ + private final List allPartitionStatsList = + new ArrayList(); + /** Handler for aggregators */ + private MasterAggregatorHandler aggregatorHandler; + /** Master class */ + private MasterCompute masterCompute; + /** IPC Client */ + private MasterClient masterClient; + /** IPC Server */ + private MasterServer masterServer; + /** Master info */ + private MasterInfo masterInfo; + /** List of workers in current superstep */ + private List chosenWorkerInfoList = Lists.newArrayList(); + /** Limit locality information added to each InputSplit znode */ + private final int localityLimit = 5; + /** Observers over master lifecycle. */ + private final MasterObserver[] observers; + + // Per-Superstep Metrics + /** MasterCompute time */ + private GiraphTimer masterComputeTimer; + + /** + * Constructor for setting up the master. + * + * @param serverPortList ZooKeeper server port list + * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper + * @param context Mapper context + * @param graphMapper Graph mapper + */ + public BspServiceMaster( + String serverPortList, + int sessionMsecTimeout, + Mapper.Context context, + GraphMapper graphMapper) { + super(serverPortList, sessionMsecTimeout, context, graphMapper); + workerWroteCheckpoint = new PredicateLock(context); + registerBspEvent(workerWroteCheckpoint); + superstepStateChanged = new PredicateLock(context); + registerBspEvent(superstepStateChanged); + + ImmutableClassesGiraphConfiguration conf = getConfiguration(); + + maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, -1); + minWorkers = conf.getInt(GiraphConstants.MIN_WORKERS, -1); + minPercentResponded = conf.getFloat( + GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f); + eventWaitMsecs = conf.getEventWaitMsecs(); + maxSuperstepWaitMsecs = conf.getMaxMasterSuperstepWaitMsecs(); + partitionLongTailMinPrint = conf.getInt( + GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT, + GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT); + masterGraphPartitioner = + getGraphPartitionerFactory().createMasterGraphPartitioner(); + observers = getConfiguration().createMasterObservers(); + + GiraphMetrics.get().addSuperstepResetObserver(this); + GiraphStats.init(context); + } + + @Override + public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { + masterComputeTimer = new GiraphTimer(superstepMetrics, + "master-compute-call", TimeUnit.MILLISECONDS); + } + + @Override + public void setJobState(ApplicationState state, + long applicationAttempt, + long desiredSuperstep) { + JSONObject jobState = new JSONObject(); + try { + jobState.put(JSONOBJ_STATE_KEY, state.toString()); + jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt); + jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep); + } catch (JSONException e) { + throw new RuntimeException("setJobState: Couldn't put " + + state.toString()); + } + if (LOG.isInfoEnabled()) { + LOG.info("setJobState: " + jobState.toString() + " on superstep " + + getSuperstep()); + } + try { + getZkExt().createExt(masterJobStatePath + "/jobState", + jobState.toString().getBytes(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL, + true); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalStateException( + "setJobState: Imposible that " + + masterJobStatePath + " already exists!", e); + } catch (KeeperException e) { + throw new IllegalStateException( + "setJobState: Unknown KeeperException for " + + masterJobStatePath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "setJobState: Unknown InterruptedException for " + + masterJobStatePath, e); + } + + if (state == ApplicationState.FAILED) { + failJob(); + } + } + + /** + * Common method for generating vertex/edge input splits. + * + * @param inputFormat The vertex/edge input format + * @param numWorkers Number of available workers + * @param inputSplitType Type of input splits (for logging purposes) + * @return List of input splits for the given format + */ + private List generateInputSplits(GiraphInputFormat inputFormat, + int numWorkers, + String inputSplitType) { + String logPrefix = "generate" + inputSplitType + "InputSplits"; + List splits; + try { + splits = inputFormat.getSplits(getContext(), numWorkers); + } catch (IOException e) { + throw new IllegalStateException(logPrefix + ": Got IOException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + logPrefix + ": Got InterruptedException", e); + } + float samplePercent = + getConfiguration().getFloat( + GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT, + GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT); + if (samplePercent != + GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) { + int lastIndex = (int) (samplePercent * splits.size() / 100f); + List sampleSplits = splits.subList(0, lastIndex); + LOG.warn(logPrefix + ": Using sampling - Processing only " + + sampleSplits.size() + " instead of " + splits.size() + + " expected splits."); + return sampleSplits; + } else { + if (LOG.isInfoEnabled()) { + LOG.info(logPrefix + ": Got " + splits.size() + + " input splits for " + numWorkers + " workers"); + } + return splits; + } + } + + /** + * When there is no salvaging this job, fail it. + */ + private void failJob() { + LOG.fatal("failJob: Killing job " + getJobId()); + try { + @SuppressWarnings("deprecation") + org.apache.hadoop.mapred.JobClient jobClient = + new org.apache.hadoop.mapred.JobClient( + (org.apache.hadoop.mapred.JobConf) + getContext().getConfiguration()); + @SuppressWarnings("deprecation") + JobID jobId = JobID.forName(getJobId()); + RunningJob job = jobClient.getJob(jobId); + failureCleanup(null); + job.killJob(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Parse the {@link WorkerInfo} objects from a ZooKeeper path + * (and children). + * + * @param workerInfosPath Path where all the workers are children + * @param watch Watch or not? + * @return List of workers in that path + */ + private List getWorkerInfosFromPath(String workerInfosPath, + boolean watch) { + List workerInfoList = new ArrayList(); + List workerInfoPathList; + try { + workerInfoPathList = + getZkExt().getChildrenExt(workerInfosPath, watch, false, true); + } catch (KeeperException e) { + throw new IllegalStateException( + "getWorkers: Got KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getWorkers: Got InterruptedStateException", e); + } + for (String workerInfoPath : workerInfoPathList) { + WorkerInfo workerInfo = new WorkerInfo(); + WritableUtils.readFieldsFromZnode( + getZkExt(), workerInfoPath, true, null, workerInfo); + workerInfoList.add(workerInfo); + } + return workerInfoList; + } + + /** + * Get the healthy and unhealthy {@link WorkerInfo} objects for + * a superstep + * + * @param superstep superstep to check + * @param healthyWorkerInfoList filled in with current data + * @param unhealthyWorkerInfoList filled in with current data + */ + private void getAllWorkerInfos( + long superstep, + List healthyWorkerInfoList, + List unhealthyWorkerInfoList) { + String healthyWorkerInfoPath = + getWorkerInfoHealthyPath(getApplicationAttempt(), superstep); + String unhealthyWorkerInfoPath = + getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep); + + try { + getZkExt().createOnceExt(healthyWorkerInfoPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException e) { + throw new IllegalStateException("getWorkers: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("getWorkers: IllegalStateException", e); + } + + try { + getZkExt().createOnceExt(unhealthyWorkerInfoPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException e) { + throw new IllegalStateException("getWorkers: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException("getWorkers: IllegalStateException", e); + } + + List currentHealthyWorkerInfoList = + getWorkerInfosFromPath(healthyWorkerInfoPath, true); + List currentUnhealthyWorkerInfoList = + getWorkerInfosFromPath(unhealthyWorkerInfoPath, false); + + healthyWorkerInfoList.clear(); + if (currentHealthyWorkerInfoList != null) { + for (WorkerInfo healthyWorkerInfo : + currentHealthyWorkerInfoList) { + healthyWorkerInfoList.add(healthyWorkerInfo); + } + } + + unhealthyWorkerInfoList.clear(); + if (currentUnhealthyWorkerInfoList != null) { + for (WorkerInfo unhealthyWorkerInfo : + currentUnhealthyWorkerInfoList) { + unhealthyWorkerInfoList.add(unhealthyWorkerInfo); + } + } + } + + /** + * Check all the {@link WorkerInfo} objects to ensure that a minimum + * number of good workers exists out of the total that have reported. + * + * @return List of of healthy workers such that the minimum has been + * met, otherwise null + */ + private List checkWorkers() { + boolean failJob = true; + long failWorkerCheckMsecs = + SystemTime.get().getMilliseconds() + maxSuperstepWaitMsecs; + List healthyWorkerInfoList = new ArrayList(); + List unhealthyWorkerInfoList = new ArrayList(); + int totalResponses = -1; + while (SystemTime.get().getMilliseconds() < failWorkerCheckMsecs) { + getContext().progress(); + getAllWorkerInfos( + getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList); + totalResponses = healthyWorkerInfoList.size() + + unhealthyWorkerInfoList.size(); + if ((totalResponses * 100.0f / maxWorkers) >= + minPercentResponded) { + failJob = false; + break; + } + getContext().setStatus(getGraphMapper().getMapFunctions() + " " + + "checkWorkers: Only found " + + totalResponses + + " responses of " + maxWorkers + + " needed to start superstep " + + getSuperstep()); + if (getWorkerHealthRegistrationChangedEvent().waitMsecs( + eventWaitMsecs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("checkWorkers: Got event that health " + + "registration changed, not using poll attempt"); + } + getWorkerHealthRegistrationChangedEvent().reset(); + continue; + } + if (LOG.isInfoEnabled()) { + LOG.info("checkWorkers: Only found " + totalResponses + + " responses of " + maxWorkers + + " needed to start superstep " + + getSuperstep() + ". Reporting every" + + eventWaitMsecs + " msecs, " + + (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) + + " more msecs left before giving up."); + // Find the missing workers if there are only a few + if ((maxWorkers - totalResponses) <= + partitionLongTailMinPrint) { + logMissingWorkersOnSuperstep(healthyWorkerInfoList, + unhealthyWorkerInfoList); + } + } + } + if (failJob) { + LOG.error("checkWorkers: Did not receive enough processes in " + + "time (only " + totalResponses + " of " + + minWorkers + " required) after waiting " + maxSuperstepWaitMsecs + + "msecs). This occurs if you do not have enough map tasks " + + "available simultaneously on your Hadoop instance to fulfill " + + "the number of requested workers."); + return null; + } + + if (healthyWorkerInfoList.size() < minWorkers) { + LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() + + " available when " + minWorkers + " are required."); + logMissingWorkersOnSuperstep(healthyWorkerInfoList, + unhealthyWorkerInfoList); + return null; + } + + getContext().setStatus(getGraphMapper().getMapFunctions() + " " + + "checkWorkers: Done - Found " + totalResponses + + " responses of " + maxWorkers + " needed to start superstep " + + getSuperstep()); + + return healthyWorkerInfoList; + } + + /** + * Log info level of the missing workers on the superstep + * + * @param healthyWorkerInfoList Healthy worker list + * @param unhealthyWorkerInfoList Unhealthy worker list + */ + private void logMissingWorkersOnSuperstep( + List healthyWorkerInfoList, + List unhealthyWorkerInfoList) { + if (LOG.isInfoEnabled()) { + Set partitionSet = new TreeSet(); + for (WorkerInfo workerInfo : healthyWorkerInfoList) { + partitionSet.add(workerInfo.getTaskId()); + } + for (WorkerInfo workerInfo : unhealthyWorkerInfoList) { + partitionSet.add(workerInfo.getTaskId()); + } + for (int i = 1; i <= maxWorkers; ++i) { + if (partitionSet.contains(Integer.valueOf(i))) { + continue; + } else if (i == getTaskPartition()) { + continue; + } else { + LOG.info("logMissingWorkersOnSuperstep: No response from " + + "partition " + i + " (could be master)"); + } + } + } + } + + /** + * Common method for creating vertex/edge input splits. + * + * @param inputFormat The vertex/edge input format + * @param inputSplitPaths ZooKeeper input split paths + * @param inputSplitType Type of input split (for logging purposes) + * @return Number of splits. Returns -1 on failure to create + * valid input splits. + */ + private int createInputSplits(GiraphInputFormat inputFormat, + InputSplitPaths inputSplitPaths, + String inputSplitType) { + String logPrefix = "create" + inputSplitType + "InputSplits"; + // Only the 'master' should be doing this. Wait until the number of + // processes that have reported health exceeds the minimum percentage. + // If the minimum percentage is not met, fail the job. Otherwise + // generate the input splits + String inputSplitsPath = inputSplitPaths.getPath(); + try { + if (getZkExt().exists(inputSplitsPath, false) != null) { + LOG.info(inputSplitsPath + " already exists, no need to create"); + return Integer.parseInt( + new String(getZkExt().getData(inputSplitsPath, false, null))); + } + } catch (KeeperException.NoNodeException e) { + if (LOG.isInfoEnabled()) { + LOG.info(logPrefix + ": Need to create the input splits at " + + inputSplitsPath); + } + } catch (KeeperException e) { + throw new IllegalStateException(logPrefix + ": KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException(logPrefix + ": InterrtupedException", e); + } + + // When creating znodes, in case the master has already run, resume + // where it left off. + List healthyWorkerInfoList = checkWorkers(); + if (healthyWorkerInfoList == null) { + setJobState(ApplicationState.FAILED, -1, -1); + return -1; + } + + // Note that the input splits may only be a sample if + // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100 + List splitList = generateInputSplits(inputFormat, + healthyWorkerInfoList.size(), inputSplitType); + + if (splitList.isEmpty()) { + LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " + + "check input of " + inputFormat.getClass().getName() + "!"); + getContext().setStatus("Failing job due to 0 input splits, " + + "check input of " + inputFormat.getClass().getName() + "!"); + failJob(); + } + if (healthyWorkerInfoList.size() > splitList.size()) { + LOG.warn(logPrefix + ": Number of inputSplits=" + + splitList.size() + " < " + + healthyWorkerInfoList.size() + + "=number of healthy processes, " + + "some workers will be not used"); + } + + // Write input splits to zookeeper in parallel + int inputSplitThreadCount = getConfiguration().getInt( + INPUT_SPLIT_THREAD_COUNT, + DEFAULT_INPUT_SPLIT_THREAD_COUNT); + if (LOG.isInfoEnabled()) { + LOG.info(logPrefix + ": Starting to write input split data " + + "to zookeeper with " + inputSplitThreadCount + " threads"); + } + ExecutorService taskExecutor = + Executors.newFixedThreadPool(inputSplitThreadCount); + for (int i = 0; i < splitList.size(); ++i) { + InputSplit inputSplit = splitList.get(i); + taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i)); + } + taskExecutor.shutdown(); + ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext()); + if (LOG.isInfoEnabled()) { + LOG.info(logPrefix + ": Done writing input split data to zookeeper"); + } + + // Let workers know they can start trying to load the input splits + try { + getZkExt().createExt(inputSplitPaths.getAllReadyPath(), + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + false); + } catch (KeeperException.NodeExistsException e) { + LOG.info(logPrefix + ": Node " + + inputSplitPaths.getAllReadyPath() + " already exists."); + } catch (KeeperException e) { + throw new IllegalStateException(logPrefix + ": KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException(logPrefix + ": IllegalStateException", e); + } + + return splitList.size(); + } + + @Override + public int createVertexInputSplits() { + // Short-circuit if there is no vertex input format + if (!getConfiguration().hasVertexInputFormat()) { + return 0; + } + VertexInputFormat vertexInputFormat = + getConfiguration().createVertexInputFormat(); + return createInputSplits(vertexInputFormat, vertexInputSplitsPaths, + "Vertex"); + } + + @Override + public int createEdgeInputSplits() { + // Short-circuit if there is no edge input format + if (!getConfiguration().hasEdgeInputFormat()) { + return 0; + } + EdgeInputFormat edgeInputFormat = + getConfiguration().createEdgeInputFormat(); + return createInputSplits(edgeInputFormat, edgeInputSplitsPaths, + "Edge"); + } + + @Override + public List getWorkerInfoList() { + return chosenWorkerInfoList; + } + + @Override + public MasterAggregatorHandler getAggregatorHandler() { + return aggregatorHandler; + } + + /** + * Read the finalized checkpoint file and associated metadata files for the + * checkpoint. Modifies the {@link PartitionOwner} objects to get the + * checkpoint prefixes. It is an optimization to prevent all workers from + * searching all the files. Also read in the aggregator data from the + * finalized checkpoint file and setting it. + * + * @param superstep Checkpoint set to examine. + * @param partitionOwners Partition owners to modify with checkpoint + * prefixes + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + private void prepareCheckpointRestart( + long superstep, + Collection partitionOwners) + throws IOException, KeeperException, InterruptedException { + FileSystem fs = getFs(); + List validMetadataPathList = new ArrayList(); + String finalizedCheckpointPath = + getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; + DataInputStream finalizedStream = + fs.open(new Path(finalizedCheckpointPath)); + GlobalStats globalStats = new GlobalStats(); + globalStats.readFields(finalizedStream); + updateCounters(globalStats); + int prefixFileCount = finalizedStream.readInt(); + for (int i = 0; i < prefixFileCount; ++i) { + String metadataFilePath = + finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX; + validMetadataPathList.add(new Path(metadataFilePath)); + } + + aggregatorHandler.readFields(finalizedStream); + masterCompute.readFields(finalizedStream); + finalizedStream.close(); + + Map idOwnerMap = + new HashMap(); + for (PartitionOwner partitionOwner : partitionOwners) { + if (idOwnerMap.put(partitionOwner.getPartitionId(), + partitionOwner) != null) { + throw new IllegalStateException( + "prepareCheckpointRestart: Duplicate partition " + + partitionOwner); + } + } + // Reading the metadata files. Simply assign each partition owner + // the correct file prefix based on the partition id. + for (Path metadataPath : validMetadataPathList) { + String checkpointFilePrefix = metadataPath.toString(); + checkpointFilePrefix = + checkpointFilePrefix.substring( + 0, + checkpointFilePrefix.length() - + CHECKPOINT_METADATA_POSTFIX.length()); + DataInputStream metadataStream = fs.open(metadataPath); + long partitions = metadataStream.readInt(); + for (long i = 0; i < partitions; ++i) { + long dataPos = metadataStream.readLong(); + int partitionId = metadataStream.readInt(); + PartitionOwner partitionOwner = idOwnerMap.get(partitionId); + if (LOG.isInfoEnabled()) { + LOG.info("prepareSuperstepRestart: File " + metadataPath + + " with position " + dataPos + + ", partition id = " + partitionId + + " assigned to " + partitionOwner); + } + partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix); + } + metadataStream.close(); + } + } + + @Override + public void setup() { + // Might have to manually load a checkpoint. + // In that case, the input splits are not set, they will be faked by + // the checkpoint files. Each checkpoint file will be an input split + // and the input split + + if (getRestartedSuperstep() != UNSET_SUPERSTEP) { + GiraphStats.getInstance().getSuperstepCounter(). + setValue(getRestartedSuperstep()); + } + for (MasterObserver observer : observers) { + observer.preApplication(); + getContext().progress(); + } + } + + @Override + public boolean becomeMaster() { + // Create my bid to become the master, then try to become the worker + // or return false. + String myBid = null; + try { + myBid = + getZkExt().createExt(masterElectionPath + + "/" + getHostnamePartitionId(), + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, + true); + } catch (KeeperException e) { + throw new IllegalStateException( + "becomeMaster: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "becomeMaster: IllegalStateException", e); + } + while (true) { + JSONObject jobState = getJobState(); + try { + if ((jobState != null) && + ApplicationState.valueOf( + jobState.getString(JSONOBJ_STATE_KEY)) == + ApplicationState.FINISHED) { + LOG.info("becomeMaster: Job is finished, " + + "give up trying to be the master!"); + isMaster = false; + return isMaster; + } + } catch (JSONException e) { + throw new IllegalStateException( + "becomeMaster: Couldn't get state from " + jobState, e); + } + try { + List masterChildArr = + getZkExt().getChildrenExt( + masterElectionPath, true, true, true); + if (LOG.isInfoEnabled()) { + LOG.info("becomeMaster: First child is '" + + masterChildArr.get(0) + "' and my bid is '" + + myBid + "'"); + } + if (masterChildArr.get(0).equals(myBid)) { + GiraphStats.getInstance().getCurrentMasterTaskPartition(). + setValue(getTaskPartition()); + masterCompute = getConfiguration().createMasterCompute(); + aggregatorHandler = new MasterAggregatorHandler(getConfiguration(), + getContext()); + aggregatorHandler.initialize(this); + + masterInfo = new MasterInfo(); + masterServer = + new NettyMasterServer(getConfiguration(), this, getContext()); + masterInfo.setInetSocketAddress(masterServer.getMyAddress()); + masterInfo.setTaskId(getTaskPartition()); + masterClient = + new NettyMasterClient(getContext(), getConfiguration(), this); + + if (LOG.isInfoEnabled()) { + LOG.info("becomeMaster: I am now the master!"); + } + isMaster = true; + return isMaster; + } + LOG.info("becomeMaster: Waiting to become the master..."); + getMasterElectionChildrenChangedEvent().waitForever(); + getMasterElectionChildrenChangedEvent().reset(); + } catch (KeeperException e) { + throw new IllegalStateException( + "becomeMaster: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "becomeMaster: IllegalStateException", e); + } + } + } + + @Override + public MasterInfo getMasterInfo() { + return masterInfo; + } + + /** + * Collect and aggregate the worker statistics for a particular superstep. + * + * @param superstep Superstep to aggregate on + * @return Global statistics aggregated on all worker statistics + */ + private GlobalStats aggregateWorkerStats(long superstep) { + ImmutableClassesGiraphConfiguration conf = getConfiguration(); + + Class partitionStatsClass = + masterGraphPartitioner.createPartitionStats().getClass(); + GlobalStats globalStats = new GlobalStats(); + // Get the stats from the all the worker selected nodes + String workerFinishedPath = + getWorkerFinishedPath(getApplicationAttempt(), superstep); + List workerFinishedPathList = null; + try { + workerFinishedPathList = + getZkExt().getChildrenExt( + workerFinishedPath, false, false, true); + } catch (KeeperException e) { + throw new IllegalStateException( + "aggregateWorkerStats: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "aggregateWorkerStats: InterruptedException", e); + } + + AggregatedMetrics aggregatedMetrics = new AggregatedMetrics(); + + allPartitionStatsList.clear(); + for (String finishedPath : workerFinishedPathList) { + String hostnamePartitionId = FilenameUtils.getName(finishedPath); + JSONObject workerFinishedInfoObj = null; + try { + byte [] zkData = + getZkExt().getData(finishedPath, false, null); + workerFinishedInfoObj = new JSONObject(new String(zkData)); + List statsList = + WritableUtils.readListFieldsFromByteArray( + Base64.decode(workerFinishedInfoObj.getString( + JSONOBJ_PARTITION_STATS_KEY)), + partitionStatsClass, + conf); + for (PartitionStats partitionStats : statsList) { + globalStats.addPartitionStats(partitionStats); + allPartitionStatsList.add(partitionStats); + } + globalStats.addMessageCount( + workerFinishedInfoObj.getLong( + JSONOBJ_NUM_MESSAGES_KEY)); + if (conf.metricsEnabled() && + workerFinishedInfoObj.has(JSONOBJ_METRICS_KEY)) { + WorkerSuperstepMetrics workerMetrics = new WorkerSuperstepMetrics(); + WritableUtils.readFieldsFromByteArray( + Base64.decode( + workerFinishedInfoObj.getString( + JSONOBJ_METRICS_KEY)), + workerMetrics); + aggregatedMetrics.add(workerMetrics, hostnamePartitionId); + } + } catch (JSONException e) { + throw new IllegalStateException( + "aggregateWorkerStats: JSONException", e); + } catch (KeeperException e) { + throw new IllegalStateException( + "aggregateWorkerStats: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "aggregateWorkerStats: InterruptedException", e); + } catch (IOException e) { + throw new IllegalStateException( + "aggregateWorkerStats: IOException", e); + } + } + + if (conf.metricsEnabled()) { + aggregatedMetrics.print(superstep); + } + + if (LOG.isInfoEnabled()) { + LOG.info("aggregateWorkerStats: Aggregation found " + globalStats + + " on superstep = " + getSuperstep()); + } + return globalStats; + } + + /** + * Finalize the checkpoint file prefixes by taking the chosen workers and + * writing them to a finalized file. Also write out the master + * aggregated aggregator array from the previous superstep. + * + * @param superstep superstep to finalize + * @param chosenWorkerInfoList list of chosen workers that will be finalized + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + private void finalizeCheckpoint(long superstep, + List chosenWorkerInfoList) + throws IOException, KeeperException, InterruptedException { + Path finalizedCheckpointPath = + new Path(getCheckpointBasePath(superstep) + + CHECKPOINT_FINALIZED_POSTFIX); + try { + getFs().delete(finalizedCheckpointPath, false); + } catch (IOException e) { + LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " + + finalizedCheckpointPath); + } + + // Format: + // + // + // ... + // + // + FSDataOutputStream finalizedOutputStream = + getFs().create(finalizedCheckpointPath); + + String superstepFinishedNode = + getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1); + finalizedOutputStream.write( + getZkExt().getData(superstepFinishedNode, false, null)); + + finalizedOutputStream.writeInt(chosenWorkerInfoList.size()); + for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { + String chosenWorkerInfoPrefix = + getCheckpointBasePath(superstep) + "." + + chosenWorkerInfo.getHostnameId(); + finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix); + } + aggregatorHandler.write(finalizedOutputStream); + masterCompute.write(finalizedOutputStream); + finalizedOutputStream.close(); + lastCheckpointedSuperstep = superstep; + GiraphStats.getInstance(). + getLastCheckpointedSuperstep().setValue(superstep); + } + + /** + * Assign the partitions for this superstep. If there are changes, + * the workers will know how to do the exchange. If this was a restarted + * superstep, then make sure to provide information on where to find the + * checkpoint file. + * + * @param allPartitionStatsList All partition stats + * @param chosenWorkerInfoList All the chosen worker infos + * @param masterGraphPartitioner Master graph partitioner + */ + private void assignPartitionOwners( + List allPartitionStatsList, + List chosenWorkerInfoList, + MasterGraphPartitioner masterGraphPartitioner) { + Collection partitionOwners; + if (getSuperstep() == INPUT_SUPERSTEP || + getSuperstep() == getRestartedSuperstep()) { + partitionOwners = + masterGraphPartitioner.createInitialPartitionOwners( + chosenWorkerInfoList, maxWorkers); + if (partitionOwners.isEmpty()) { + throw new IllegalStateException( + "assignAndExchangePartitions: No partition owners set"); + } + } else { + partitionOwners = + masterGraphPartitioner.generateChangedPartitionOwners( + allPartitionStatsList, + chosenWorkerInfoList, + maxWorkers, + getSuperstep()); + + PartitionUtils.analyzePartitionStats(partitionOwners, + allPartitionStatsList); + } + checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners()); + + // If restarted, prepare the checkpoint restart + if (getRestartedSuperstep() == getSuperstep()) { + try { + prepareCheckpointRestart(getSuperstep(), partitionOwners); + } catch (IOException e) { + throw new IllegalStateException( + "assignPartitionOwners: IOException on preparing", e); + } catch (KeeperException e) { + throw new IllegalStateException( + "assignPartitionOwners: KeeperException on preparing", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "assignPartitionOwners: InteruptedException on preparing", + e); + } + } + + // There will be some exchange of partitions + if (!partitionOwners.isEmpty()) { + String vertexExchangePath = + getPartitionExchangePath(getApplicationAttempt(), + getSuperstep()); + try { + getZkExt().createOnceExt(vertexExchangePath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException e) { + throw new IllegalStateException( + "assignPartitionOwners: KeeperException creating " + + vertexExchangePath); + } catch (InterruptedException e) { + throw new IllegalStateException( + "assignPartitionOwners: InterruptedException creating " + + vertexExchangePath); + } + } + + // Workers are waiting for these assignments + AddressesAndPartitionsWritable addressesAndPartitions = + new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList, + partitionOwners); + String addressesAndPartitionsPath = + getAddressesAndPartitionsPath(getApplicationAttempt(), + getSuperstep()); + WritableUtils.writeToZnode( + getZkExt(), + addressesAndPartitionsPath, + -1, + addressesAndPartitions); + } + + /** + * Check if partition ids are valid + * + * @param partitionOwners List of partition ids for current superstep + */ + private void checkPartitions(Collection partitionOwners) { + for (PartitionOwner partitionOwner : partitionOwners) { + int partitionId = partitionOwner.getPartitionId(); + if (partitionId < 0 || partitionId >= partitionOwners.size()) { + throw new IllegalStateException("checkPartitions: " + + "Invalid partition id " + partitionId + + " - partition ids must be values from 0 to (numPartitions - 1)"); + } + } + } + + /** + * Check whether the workers chosen for this superstep are still alive + * + * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper + * @param chosenWorkerInfoList List of the healthy workers + * @return true if they are all alive, false otherwise. + * @throws InterruptedException + * @throws KeeperException + */ + private boolean superstepChosenWorkerAlive( + String chosenWorkerInfoHealthPath, + List chosenWorkerInfoList) + throws KeeperException, InterruptedException { + List chosenWorkerInfoHealthyList = + getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false); + Set chosenWorkerInfoHealthySet = + new HashSet(chosenWorkerInfoHealthyList); + boolean allChosenWorkersHealthy = true; + for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { + if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) { + allChosenWorkersHealthy = false; + LOG.error("superstepChosenWorkerAlive: Missing chosen " + + "worker " + chosenWorkerInfo + + " on superstep " + getSuperstep()); + } + } + return allChosenWorkersHealthy; + } + + @Override + public void restartFromCheckpoint(long checkpoint) { + // Process: + // 1. Remove all old input split data + // 2. Increase the application attempt and set to the correct checkpoint + // 3. Send command to all workers to restart their tasks + try { + getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1, + true); + getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1, + true); + } catch (InterruptedException e) { + throw new RuntimeException( + "restartFromCheckpoint: InterruptedException", e); + } catch (KeeperException e) { + throw new RuntimeException( + "restartFromCheckpoint: KeeperException", e); + } + setApplicationAttempt(getApplicationAttempt() + 1); + setCachedSuperstep(checkpoint); + setRestartedSuperstep(checkpoint); + setJobState(ApplicationState.START_SUPERSTEP, + getApplicationAttempt(), + checkpoint); + } + + /** + * Only get the finalized checkpoint files + */ + public static class FinalizedCheckpointPathFilter implements PathFilter { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX); + } + } + + @Override + public long getLastGoodCheckpoint() throws IOException { + // Find the last good checkpoint if none have been written to the + // knowledge of this master + if (lastCheckpointedSuperstep == -1) { + try { + FileStatus[] fileStatusArray = + getFs().listStatus(new Path(checkpointBasePath), + new FinalizedCheckpointPathFilter()); + if (fileStatusArray == null) { + return -1; + } + Arrays.sort(fileStatusArray); + lastCheckpointedSuperstep = getCheckpoint( + fileStatusArray[fileStatusArray.length - 1].getPath()); + if (LOG.isInfoEnabled()) { + LOG.info("getLastGoodCheckpoint: Found last good checkpoint " + + lastCheckpointedSuperstep + " from " + + fileStatusArray[fileStatusArray.length - 1]. + getPath().toString()); + } + } catch (IOException e) { + LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " + + "found, killing the job.", e); + failJob(); + } + } + + return lastCheckpointedSuperstep; + } + + /** + * Wait for a set of workers to signal that they are done with the + * barrier. + * + * @param finishedWorkerPath Path to where the workers will register their + * hostname and id + * @param workerInfoList List of the workers to wait for + * @param event Event to wait on for a chance to be done. + * @return True if barrier was successful, false if there was a worker + * failure + */ + private boolean barrierOnWorkerList(String finishedWorkerPath, + List workerInfoList, + BspEvent event) { + try { + getZkExt().createOnceExt(finishedWorkerPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException e) { + throw new IllegalStateException( + "barrierOnWorkerList: KeeperException - Couldn't create " + + finishedWorkerPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "barrierOnWorkerList: InterruptedException - Couldn't create " + + finishedWorkerPath, e); + } + List hostnameIdList = + new ArrayList(workerInfoList.size()); + for (WorkerInfo workerInfo : workerInfoList) { + hostnameIdList.add(workerInfo.getHostnameId()); + } + String workerInfoHealthyPath = + getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()); + List finishedHostnameIdList; + long nextInfoMillis = System.currentTimeMillis(); + final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker + final int taskTimeoutMsec = getContext().getConfiguration().getInt( + "mapred.task.timeout", defaultTaskTimeoutMsec); + while (true) { + try { + finishedHostnameIdList = + getZkExt().getChildrenExt(finishedWorkerPath, + true, + false, + false); + } catch (KeeperException e) { + throw new IllegalStateException( + "barrierOnWorkerList: KeeperException - Couldn't get " + + "children of " + finishedWorkerPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "barrierOnWorkerList: IllegalException - Couldn't get " + + "children of " + finishedWorkerPath, e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("barrierOnWorkerList: Got finished worker list = " + + finishedHostnameIdList + ", size = " + + finishedHostnameIdList.size() + + ", worker list = " + + workerInfoList + ", size = " + + workerInfoList.size() + + " from " + finishedWorkerPath); + } + + if (LOG.isInfoEnabled() && + (System.currentTimeMillis() > nextInfoMillis)) { + nextInfoMillis = System.currentTimeMillis() + 30000; + LOG.info("barrierOnWorkerList: " + + finishedHostnameIdList.size() + + " out of " + workerInfoList.size() + + " workers finished on superstep " + + getSuperstep() + " on path " + finishedWorkerPath); + if (workerInfoList.size() - finishedHostnameIdList.size() < + MAX_PRINTABLE_REMAINING_WORKERS) { + Set remainingWorkers = Sets.newHashSet(hostnameIdList); + remainingWorkers.removeAll(finishedHostnameIdList); + LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers); + } + } + getContext().setStatus(getGraphMapper().getMapFunctions() + " - " + + finishedHostnameIdList.size() + + " finished out of " + + workerInfoList.size() + + " on superstep " + getSuperstep()); + if (finishedHostnameIdList.containsAll(hostnameIdList)) { + break; + } + + // Wait for a signal or timeout + event.waitMsecs(taskTimeoutMsec / 2); + event.reset(); + getContext().progress(); + + // Did a worker die? + try { + if ((getSuperstep() > 0) && + !superstepChosenWorkerAlive( + workerInfoHealthyPath, + workerInfoList)) { + return false; + } + } catch (KeeperException e) { + throw new IllegalStateException( + "barrierOnWorkerList: KeeperException - " + + "Couldn't get " + workerInfoHealthyPath, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "barrierOnWorkerList: InterruptedException - " + + "Couldn't get " + workerInfoHealthyPath, e); + } + } + + return true; + } + + /** + * Clean up old superstep data from Zookeeper + * + * @param removeableSuperstep Supersteo to clean up + * @throws InterruptedException + */ + private void cleanUpOldSuperstep(long removeableSuperstep) throws + InterruptedException { + if (!(getConfiguration().getBoolean( + GiraphConstants.KEEP_ZOOKEEPER_DATA, + GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) && + (removeableSuperstep >= 0)) { + String oldSuperstepPath = + getSuperstepPath(getApplicationAttempt()) + "/" + + removeableSuperstep; + try { + if (LOG.isInfoEnabled()) { + LOG.info("coordinateSuperstep: Cleaning up old Superstep " + + oldSuperstepPath); + } + getZkExt().deleteExt(oldSuperstepPath, + -1, + true); + } catch (KeeperException.NoNodeException e) { + LOG.warn("coordinateBarrier: Already cleaned up " + + oldSuperstepPath); + } catch (KeeperException e) { + throw new IllegalStateException( + "coordinateSuperstep: KeeperException on " + + "finalizing checkpoint", e); + } + } + } + + /** + * Coordinate the exchange of vertex/edge input splits among workers. + * + * @param inputSplitPaths Input split paths + * @param inputSplitEvents Input split events + * @param inputSplitsType Type of input splits (for logging purposes) + */ + private void coordinateInputSplits(InputSplitPaths inputSplitPaths, + InputSplitEvents inputSplitEvents, + String inputSplitsType) { + // Coordinate the workers finishing sending their vertices/edges to the + // correct workers and signal when everything is done. + String logPrefix = "coordinate" + inputSplitsType + "InputSplits"; + if (!barrierOnWorkerList(inputSplitPaths.getDonePath(), + chosenWorkerInfoList, + inputSplitEvents.getDoneStateChanged())) { + throw new IllegalStateException(logPrefix + ": Worker failed during " + + "input split (currently not supported)"); + } + try { + getZkExt().createExt(inputSplitPaths.getAllDonePath(), + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + false); + } catch (KeeperException.NodeExistsException e) { + LOG.info("coordinateInputSplits: Node " + + inputSplitPaths.getAllDonePath() + " already exists."); + } catch (KeeperException e) { + throw new IllegalStateException(logPrefix + ": KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException(logPrefix + ": IllegalStateException", e); + } + } + + @Override + public SuperstepState coordinateSuperstep() throws + KeeperException, InterruptedException { + // 1. Get chosen workers and set up watches on them. + // 2. Assign partitions to the workers + // (possibly reloading from a superstep) + // 3. Wait for all workers to complete + // 4. Collect and process aggregators + // 5. Create superstep finished node + // 6. If the checkpoint frequency is met, finalize the checkpoint + + for (MasterObserver observer : observers) { + observer.preSuperstep(); + getContext().progress(); + } + + chosenWorkerInfoList = checkWorkers(); + if (chosenWorkerInfoList == null) { + LOG.fatal("coordinateSuperstep: Not enough healthy workers for " + + "superstep " + getSuperstep()); + setJobState(ApplicationState.FAILED, -1, -1); + } else { + for (WorkerInfo workerInfo : chosenWorkerInfoList) { + String workerInfoHealthyPath = + getWorkerInfoHealthyPath(getApplicationAttempt(), + getSuperstep()) + "/" + + workerInfo.getHostnameId(); + if (getZkExt().exists(workerInfoHealthyPath, true) == null) { + LOG.warn("coordinateSuperstep: Chosen worker " + + workerInfoHealthyPath + + " is no longer valid, failing superstep"); + } + } + } + + masterClient.openConnections(); + + GiraphStats.getInstance(). + getCurrentWorkers().setValue(chosenWorkerInfoList.size()); + assignPartitionOwners(allPartitionStatsList, + chosenWorkerInfoList, + masterGraphPartitioner); + + // We need to finalize aggregators from previous superstep (send them to + // worker owners) after new worker assignments + if (getSuperstep() >= 0) { + aggregatorHandler.finishSuperstep(masterClient); + } + + // Finalize the valid checkpoint file prefixes and possibly + // the aggregators. + if (checkpointFrequencyMet(getSuperstep())) { + String workerWroteCheckpointPath = + getWorkerWroteCheckpointPath(getApplicationAttempt(), + getSuperstep()); + // first wait for all the workers to write their checkpoint data + if (!barrierOnWorkerList(workerWroteCheckpointPath, + chosenWorkerInfoList, + getWorkerWroteCheckpointEvent())) { + return SuperstepState.WORKER_FAILURE; + } + try { + finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList); + } catch (IOException e) { + throw new IllegalStateException( + "coordinateSuperstep: IOException on finalizing checkpoint", + e); + } + } + + if (getSuperstep() == INPUT_SUPERSTEP) { + if (getConfiguration().hasVertexInputFormat()) { + coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents, + "Vertex"); + } + if (getConfiguration().hasEdgeInputFormat()) { + coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents, + "Edge"); + } + } + + String finishedWorkerPath = + getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()); + if (!barrierOnWorkerList(finishedWorkerPath, + chosenWorkerInfoList, + getSuperstepStateChangedEvent())) { + return SuperstepState.WORKER_FAILURE; + } + + // Collect aggregator values, then run the master.compute() and + // finally save the aggregator values + aggregatorHandler.prepareSuperstep(masterClient); + runMasterCompute(getSuperstep()); + + // If the master is halted or all the vertices voted to halt and there + // are no more messages in the system, stop the computation + GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); + if (masterCompute.isHalted() || + (globalStats.getFinishedVertexCount() == + globalStats.getVertexCount() && + globalStats.getMessageCount() == 0)) { + globalStats.setHaltComputation(true); + } + + // Let everyone know the aggregated application state through the + // superstep finishing znode. + String superstepFinishedNode = + getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep()); + WritableUtils.writeToZnode( + getZkExt(), superstepFinishedNode, -1, globalStats); + updateCounters(globalStats); + + cleanUpOldSuperstep(getSuperstep() - 1); + incrCachedSuperstep(); + // Counter starts at zero, so no need to increment + if (getSuperstep() > 0) { + GiraphStats.getInstance().getSuperstepCounter().increment(); + } + SuperstepState superstepState; + if (globalStats.getHaltComputation()) { + superstepState = SuperstepState.ALL_SUPERSTEPS_DONE; + } else { + superstepState = SuperstepState.THIS_SUPERSTEP_DONE; + } + aggregatorHandler.writeAggregators(getSuperstep(), superstepState); + + return superstepState; + } + + /** + * Run the master.compute() class + * + * @param superstep superstep for which to run the master.compute() + */ + private void runMasterCompute(long superstep) { + // The master.compute() should run logically before the workers, so + // increase the superstep counter it uses by one + GraphState graphState = + new GraphState(superstep + 1, + GiraphStats.getInstance().getVertices().getValue(), + GiraphStats.getInstance().getEdges().getValue(), + getContext(), getGraphMapper(), null, null); + masterCompute.setGraphState(graphState); + if (superstep == INPUT_SUPERSTEP) { + try { + masterCompute.initialize(); + } catch (InstantiationException e) { + LOG.fatal("runMasterCompute: Failed in instantiation", e); + throw new RuntimeException( + "runMasterCompute: Failed in instantiation", e); + } catch (IllegalAccessException e) { + LOG.fatal("runMasterCompute: Failed in access", e); + throw new RuntimeException( + "runMasterCompute: Failed in access", e); + } + } + GiraphTimerContext timerContext = masterComputeTimer.time(); + masterCompute.compute(); + timerContext.stop(); + } + + /** + * Need to clean up ZooKeeper nicely. Make sure all the masters and workers + * have reported ending their ZooKeeper connections. + */ + private void cleanUpZooKeeper() { + try { + getZkExt().createExt(cleanedUpPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("cleanUpZooKeeper: Node " + cleanedUpPath + + " already exists, no need to create."); + } + } catch (KeeperException e) { + throw new IllegalStateException( + "cleanupZooKeeper: Got KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "cleanupZooKeeper: Got IllegalStateException", e); + } + // Need to wait for the number of workers and masters to complete + int maxTasks = BspInputFormat.getMaxTasks(getConfiguration()); + if ((getGraphMapper().getMapFunctions() == MapFunctions.ALL) || + (getGraphMapper().getMapFunctions() == + MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { + maxTasks *= 2; + } + List cleanedUpChildrenList = null; + while (true) { + try { + cleanedUpChildrenList = + getZkExt().getChildrenExt( + cleanedUpPath, true, false, true); + if (LOG.isInfoEnabled()) { + LOG.info("cleanUpZooKeeper: Got " + + cleanedUpChildrenList.size() + " of " + + maxTasks + " desired children from " + + cleanedUpPath); + } + if (cleanedUpChildrenList.size() == maxTasks) { + break; + } + if (LOG.isInfoEnabled()) { + LOG.info("cleanedUpZooKeeper: Waiting for the " + + "children of " + cleanedUpPath + + " to change since only got " + + cleanedUpChildrenList.size() + " nodes."); + } + } catch (KeeperException e) { + // We are in the cleanup phase -- just log the error + LOG.error("cleanUpZooKeeper: Got KeeperException, " + + "but will continue", e); + return; + } catch (InterruptedException e) { + // We are in the cleanup phase -- just log the error + LOG.error("cleanUpZooKeeper: Got InterruptedException, " + + "but will continue", e); + return; + } + + getCleanedUpChildrenChangedEvent().waitForever(); + getCleanedUpChildrenChangedEvent().reset(); + } + + // At this point, all processes have acknowledged the cleanup, + // and the master can do any final cleanup if the ZooKeeper service was + // provided (not dynamically started) and we don't want to keep the data + try { + if (getConfiguration().getZookeeperList() != null && + !getConfiguration().getBoolean( + GiraphConstants.KEEP_ZOOKEEPER_DATA, + GiraphConstants.KEEP_ZOOKEEPER_DATA_DEFAULT)) { + if (LOG.isInfoEnabled()) { + LOG.info("cleanupZooKeeper: Removing the following path " + + "and all children - " + basePath + " from ZooKeeper list " + + getConfiguration().getZookeeperList()); + } + getZkExt().deleteExt(basePath, -1, true); + } + } catch (KeeperException e) { + LOG.error("cleanupZooKeeper: Failed to do cleanup of " + + basePath + " due to KeeperException", e); + } catch (InterruptedException e) { + LOG.error("cleanupZooKeeper: Failed to do cleanup of " + + basePath + " due to InterruptedException", e); + } + } + + @Override + public void postApplication() { + for (MasterObserver observer : observers) { + observer.postApplication(); + getContext().progress(); + } + } + + @Override + public void postSuperstep() { + for (MasterObserver observer : observers) { + observer.postSuperstep(); + getContext().progress(); + } + } + + @Override + public void failureCleanup(Exception e) { + for (MasterObserver observer : observers) { + try { + observer.applicationFailed(e); + // CHECKSTYLE: stop IllegalCatchCheck + } catch (RuntimeException re) { + // CHECKSTYLE: resume IllegalCatchCheck + LOG.error(re.getClass().getName() + " from observer " + + observer.getClass().getName(), re); + } + getContext().progress(); + } + } + + @Override + public void cleanup() throws IOException { + // All master processes should denote they are done by adding special + // znode. Once the number of znodes equals the number of partitions + // for workers and masters, the master will clean up the ZooKeeper + // znodes associated with this job. + String masterCleanedUpPath = cleanedUpPath + "/" + + getTaskPartition() + MASTER_SUFFIX; + try { + String finalFinishedPath = + getZkExt().createExt(masterCleanedUpPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + if (LOG.isInfoEnabled()) { + LOG.info("cleanup: Notifying master its okay to cleanup with " + + finalFinishedPath); + } + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("cleanup: Couldn't create finished node '" + + masterCleanedUpPath); + } + } catch (KeeperException e) { + LOG.error("cleanup: Got KeeperException, continuing", e); + } catch (InterruptedException e) { + LOG.error("cleanup: Got InterruptedException, continuing", e); + } + + if (isMaster) { + cleanUpZooKeeper(); + // If desired, cleanup the checkpoint directory + if (getConfiguration().getBoolean( + GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, + GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) { + boolean success = + getFs().delete(new Path(checkpointBasePath), true); + if (LOG.isInfoEnabled()) { + LOG.info("cleanup: Removed HDFS checkpoint directory (" + + checkpointBasePath + ") with return = " + + success + " since the job " + getContext().getJobName() + + " succeeded "); + } + } + aggregatorHandler.close(); + + masterClient.closeConnections(); + masterServer.close(); + } + + try { + getZkExt().close(); + } catch (InterruptedException e) { + // cleanup phase -- just log the error + LOG.error("cleanup: Zookeeper failed to close", e); + } + } + + /** + * Event that the master watches that denotes when a worker wrote checkpoint + * + * @return Event that denotes when a worker wrote checkpoint + */ + public final BspEvent getWorkerWroteCheckpointEvent() { + return workerWroteCheckpoint; + } + + /** + * Event that the master watches that denotes if a worker has done something + * that changes the state of a superstep (either a worker completed or died) + * + * @return Event that denotes a superstep state change + */ + public final BspEvent getSuperstepStateChangedEvent() { + return superstepStateChanged; + } + + /** + * Should this worker failure cause the current superstep to fail? + * + * @param failedWorkerPath Full path to the failed worker + */ + private void checkHealthyWorkerFailure(String failedWorkerPath) { + if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) { + return; + } + + Collection partitionOwners = + masterGraphPartitioner.getCurrentPartitionOwners(); + String hostnameId = + getHealthyHostnameIdFromPath(failedWorkerPath); + for (PartitionOwner partitionOwner : partitionOwners) { + WorkerInfo workerInfo = partitionOwner.getWorkerInfo(); + WorkerInfo previousWorkerInfo = + partitionOwner.getPreviousWorkerInfo(); + if (workerInfo.getHostnameId().equals(hostnameId) || + ((previousWorkerInfo != null) && + previousWorkerInfo.getHostnameId().equals(hostnameId))) { + LOG.warn("checkHealthyWorkerFailure: " + + "at least one healthy worker went down " + + "for superstep " + getSuperstep() + " - " + + hostnameId + ", will try to restart from " + + "checkpointed superstep " + + lastCheckpointedSuperstep); + superstepStateChanged.signal(); + } + } + } + + @Override + public boolean processEvent(WatchedEvent event) { + boolean foundEvent = false; + if (event.getPath().contains(WORKER_HEALTHY_DIR) && + (event.getType() == EventType.NodeDeleted)) { + if (LOG.isDebugEnabled()) { + LOG.debug("processEvent: Healthy worker died (node deleted) " + + "in " + event.getPath()); + } + checkHealthyWorkerFailure(event.getPath()); + superstepStateChanged.signal(); + foundEvent = true; + } else if (event.getPath().contains(WORKER_FINISHED_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isDebugEnabled()) { + LOG.debug("processEvent: Worker finished (node change) " + + "event - superstepStateChanged signaled"); + } + superstepStateChanged.signal(); + foundEvent = true; + } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isDebugEnabled()) { + LOG.debug("processEvent: Worker wrote checkpoint (node change) " + + "event - workerWroteCheckpoint signaled"); + } + workerWroteCheckpoint.signal(); + foundEvent = true; + } + + return foundEvent; + } + + /** + * Set values of counters to match the ones from {@link GlobalStats} + * + * @param globalStats Global statistics which holds new counter values + */ + private void updateCounters(GlobalStats globalStats) { + GiraphStats gs = GiraphStats.getInstance(); + gs.getVertices().setValue(globalStats.getVertexCount()); + gs.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount()); + gs.getEdges().setValue(globalStats.getEdgeCount()); + gs.getSentMessages().setValue(globalStats.getMessageCount()); + } + + /** + * Task that writes a given input split to zookeeper. + * Upon failure call() throws an exception. + */ + private class WriteInputSplit implements Callable { + /** Input split which we are going to write */ + private final InputSplit inputSplit; + /** Input splits path */ + private final String inputSplitsPath; + /** Index of the input split */ + private final int index; + + /** + * Constructor + * + * @param inputSplit Input split which we are going to write + * @param inputSplitsPath Input splits path + * @param index Index of the input split + */ + public WriteInputSplit(InputSplit inputSplit, + String inputSplitsPath, + int index) { + this.inputSplit = inputSplit; + this.inputSplitsPath = inputSplitsPath; + this.index = index; + } + + @Override + public Void call() { + String inputSplitPath = null; + try { + ByteArrayOutputStream byteArrayOutputStream = + new ByteArrayOutputStream(); + DataOutput outputStream = + new DataOutputStream(byteArrayOutputStream); + + String[] splitLocations = inputSplit.getLocations(); + StringBuilder locations = null; + if (splitLocations != null) { + int splitListLength = + Math.min(splitLocations.length, localityLimit); + locations = new StringBuilder(); + for (String location : splitLocations) { + locations.append(location) + .append(--splitListLength > 0 ? "\t" : ""); + } + } + Text.writeString(outputStream, + locations == null ? "" : locations.toString()); + Text.writeString(outputStream, + inputSplit.getClass().getName()); + ((Writable) inputSplit).write(outputStream); + inputSplitPath = inputSplitsPath + "/" + index; + getZkExt().createExt(inputSplitPath, + byteArrayOutputStream.toByteArray(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + + if (LOG.isDebugEnabled()) { + LOG.debug("call: Created input split " + + "with index " + index + " serialized as " + + byteArrayOutputStream.toString()); + } + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("call: Node " + + inputSplitPath + " already exists."); + } + } catch (KeeperException e) { + throw new IllegalStateException( + "call: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "call: IllegalStateException", e); + } catch (IOException e) { + throw new IllegalStateException( + "call: IOException", e); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java new file mode 100644 index 0000000..bfb6f0e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterCompute.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A dumb implementation of {@link MasterCompute}. This is the default + * implementation when no MasterCompute is defined by the user. It does + * nothing. + */ + +public class DefaultMasterCompute extends MasterCompute { + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public void write(DataOutput out) throws IOException { + } + + @Override + public void compute() { + } + + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java new file mode 100644 index 0000000..aab1183 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.master; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.bsp.SuperstepState; +import org.apache.giraph.comm.MasterClient; +import org.apache.giraph.comm.aggregators.AggregatorUtils; +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.aggregators.AggregatorWrapper; +import org.apache.giraph.aggregators.AggregatorWriter; +import org.apache.giraph.bsp.BspService; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Map; + +/** Handler for aggregators on master */ +public class MasterAggregatorHandler implements MasterAggregatorUsage, + Writable { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(MasterAggregatorHandler.class); + /** + * Map of aggregators. + * This map is used to store final aggregated values received from worker + * owners, and also to read and write values provided during master.compute. + */ + private final Map> aggregatorMap = + Maps.newHashMap(); + /** Aggregator writer */ + private final AggregatorWriter aggregatorWriter; + /** Progressable used to report progress */ + private final Progressable progressable; + + /** + * Constructor + * + * @param conf Giraph configuration + * @param progressable Progressable used for reporting progress + */ + public MasterAggregatorHandler( + ImmutableClassesGiraphConfiguration conf, + Progressable progressable) { + this.progressable = progressable; + aggregatorWriter = conf.createAggregatorWriter(); + } + + @Override + public A getAggregatedValue(String name) { + AggregatorWrapper aggregator = aggregatorMap.get(name); + if (aggregator == null) { + return null; + } else { + return (A) aggregator.getPreviousAggregatedValue(); + } + } + + @Override + public void setAggregatedValue(String name, A value) { + AggregatorWrapper aggregator = aggregatorMap.get(name); + if (aggregator == null) { + throw new IllegalStateException( + "setAggregatedValue: Tried to set value of aggregator which wasn't" + + " registered " + name); + } + ((AggregatorWrapper) aggregator).setCurrentAggregatedValue(value); + } + + @Override + public boolean registerAggregator(String name, + Class> aggregatorClass) throws + InstantiationException, IllegalAccessException { + checkAggregatorName(name); + return registerAggregator(name, aggregatorClass, false) != null; + } + + @Override + public boolean registerPersistentAggregator(String name, + Class> aggregatorClass) throws + InstantiationException, IllegalAccessException { + checkAggregatorName(name); + return registerAggregator(name, aggregatorClass, true) != null; + } + + /** + * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as + * the name of aggregator. Throw an exception if he tries to use it. + * + * @param name Name of the aggregator to check. + */ + private void checkAggregatorName(String name) { + if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { + throw new IllegalStateException("checkAggregatorName: " + + AggregatorUtils.SPECIAL_COUNT_AGGREGATOR + + " is not allowed for the name of aggregator"); + } + } + + /** + * Helper function for registering aggregators. + * + * @param name Name of the aggregator + * @param aggregatorClass Class of the aggregator + * @param persistent Whether aggregator is persistent or not + * @param Aggregated value type + * @return Newly registered aggregator or aggregator which was previously + * created with selected name, if any + */ + private AggregatorWrapper registerAggregator + (String name, Class> aggregatorClass, + boolean persistent) throws InstantiationException, + IllegalAccessException { + AggregatorWrapper aggregatorWrapper = + (AggregatorWrapper) aggregatorMap.get(name); + if (aggregatorWrapper == null) { + aggregatorWrapper = + new AggregatorWrapper(aggregatorClass, persistent); + aggregatorMap.put(name, (AggregatorWrapper) aggregatorWrapper); + } + return aggregatorWrapper; + } + + /** + * Prepare aggregators for current superstep + * + * @param masterClient IPC client on master + */ + public void prepareSuperstep(MasterClient masterClient) { + if (LOG.isDebugEnabled()) { + LOG.debug("prepareSuperstep: Start preapring aggregators"); + } + // prepare aggregators for master compute + for (AggregatorWrapper aggregator : aggregatorMap.values()) { + if (aggregator.isPersistent()) { + aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue()); + } + aggregator.setPreviousAggregatedValue( + aggregator.getCurrentAggregatedValue()); + aggregator.resetCurrentAggregator(); + progressable.progress(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("prepareSuperstep: Aggregators prepared"); + } + } + + /** + * Finalize aggregators for current superstep and share them with workers + * + * @param masterClient IPC client on master + */ + public void finishSuperstep(MasterClient masterClient) { + if (LOG.isDebugEnabled()) { + LOG.debug("finishSuperstep: Start finishing aggregators"); + } + for (AggregatorWrapper aggregator : aggregatorMap.values()) { + if (aggregator.isChanged()) { + // if master compute changed the value, use the one he chose + aggregator.setPreviousAggregatedValue( + aggregator.getCurrentAggregatedValue()); + // reset aggregator for the next superstep + aggregator.resetCurrentAggregator(); + } + progressable.progress(); + } + + // send aggregators to their owners + // TODO: if aggregator owner and it's value didn't change, + // we don't need to resend it + try { + for (Map.Entry> entry : + aggregatorMap.entrySet()) { + masterClient.sendAggregator(entry.getKey(), + entry.getValue().getAggregatorClass(), + entry.getValue().getPreviousAggregatedValue()); + progressable.progress(); + } + masterClient.finishSendingAggregatedValues(); + } catch (IOException e) { + throw new IllegalStateException("finishSuperstep: " + + "IOException occurred while sending aggregators", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("finishSuperstep: Aggregators finished"); + } + } + + /** + * Accept aggregated values sent by worker. Every aggregator will be sent + * only once, by its owner. + * We don't need to count the number of these requests because global + * superstep barrier will happen after workers ensure all requests of this + * type have been received and processed by master. + * + * @param aggregatedValuesInput Input in which aggregated values are + * written in the following format: + * number_of_aggregators + * name_1 value_1 + * name_2 value_2 + * ... + * @throws IOException + */ + public void acceptAggregatedValues( + DataInput aggregatedValuesInput) throws IOException { + int numAggregators = aggregatedValuesInput.readInt(); + for (int i = 0; i < numAggregators; i++) { + String aggregatorName = aggregatedValuesInput.readUTF(); + AggregatorWrapper aggregator = + aggregatorMap.get(aggregatorName); + if (aggregator == null) { + throw new IllegalStateException( + "acceptAggregatedValues: " + + "Master received aggregator which isn't registered: " + + aggregatorName); + } + Writable aggregatorValue = aggregator.createInitialValue(); + aggregatorValue.readFields(aggregatedValuesInput); + aggregator.setCurrentAggregatedValue(aggregatorValue); + progressable.progress(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("acceptAggregatedValues: Accepted one set with " + + numAggregators + " aggregated values"); + } + } + + /** + * Write aggregators to {@link AggregatorWriter} + * + * @param superstep Superstep which just finished + * @param superstepState State of the superstep which just finished + */ + public void writeAggregators(long superstep, SuperstepState superstepState) { + try { + Iterable> iter = + Iterables.transform( + aggregatorMap.entrySet(), + new Function>, + Map.Entry>() { + @Override + public Map.Entry apply( + Map.Entry> entry) { + progressable.progress(); + return new AbstractMap.SimpleEntry(entry.getKey(), + entry.getValue().getPreviousAggregatedValue()); + } + }); + aggregatorWriter.writeAggregator(iter, + (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ? + AggregatorWriter.LAST_SUPERSTEP : superstep); + } catch (IOException e) { + throw new IllegalStateException( + "coordinateSuperstep: IOException while " + + "writing aggregators data", e); + } + } + + /** + * Initialize {@link AggregatorWriter} + * + * @param service BspService + */ + public void initialize(BspService service) { + try { + aggregatorWriter.initialize(service.getContext(), + service.getApplicationAttempt()); + } catch (IOException e) { + throw new IllegalStateException("initialize: " + + "Couldn't initialize aggregatorWriter", e); + } + } + + /** + * Close {@link AggregatorWriter} + * + * @throws IOException + */ + public void close() throws IOException { + aggregatorWriter.close(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(aggregatorMap.size()); + for (Map.Entry> entry : + aggregatorMap.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue().getAggregatorClass().getName()); + out.writeBoolean(entry.getValue().isPersistent()); + entry.getValue().getPreviousAggregatedValue().write(out); + progressable.progress(); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + aggregatorMap.clear(); + int numAggregators = in.readInt(); + try { + for (int i = 0; i < numAggregators; i++) { + String aggregatorName = in.readUTF(); + String aggregatorClassName = in.readUTF(); + boolean isPersistent = in.readBoolean(); + AggregatorWrapper aggregator = registerAggregator( + aggregatorName, + AggregatorUtils.getAggregatorClass(aggregatorClassName), + isPersistent); + Writable value = aggregator.createInitialValue(); + value.readFields(in); + aggregator.setPreviousAggregatedValue(value); + progressable.progress(); + } + } catch (InstantiationException e) { + throw new IllegalStateException("readFields: " + + "InstantiationException occurred", e); + } catch (IllegalAccessException e) { + throw new IllegalStateException("readFields: " + + "IllegalAccessException occurred", e); + } + } +}