Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 586089E70 for ; Thu, 16 Feb 2012 22:13:48 +0000 (UTC) Received: (qmail 3889 invoked by uid 500); 16 Feb 2012 22:13:48 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 3866 invoked by uid 500); 16 Feb 2012 22:13:48 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 3859 invoked by uid 99); 16 Feb 2012 22:13:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 87C102388ACC; Thu, 16 Feb 2012 22:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245205 [12/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example... Date: Thu, 16 Feb 2012 22:12:36 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216221244.87C102388ACC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Feb 16 22:12:31 2012 @@ -48,598 +48,614 @@ import java.util.List; * This mapper that will execute the BSP graph tasks. Since this mapper will * not be passing data by key-value pairs through the MR framework, the * types are irrelevant. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data */ @SuppressWarnings("rawtypes") public class GraphMapper extends - Mapper { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(GraphMapper.class); - /** Coordination service worker */ - CentralizedServiceWorker serviceWorker; - /** Coordination service master thread */ - Thread masterThread = null; - /** The map should be run exactly once, or else there is a problem. */ - boolean mapAlreadyRun = false; - /** Manages the ZooKeeper servers if necessary (dynamic startup) */ - private ZooKeeperManager zkManager; - /** Configuration */ - private Configuration conf; - /** Already complete? */ - private boolean done = false; - /** What kind of functions is this mapper doing? */ - private MapFunctions mapFunctions = MapFunctions.UNKNOWN; - /** - * Graph state for all vertices that is used for the duration of - * this mapper. - */ - private GraphState graphState = new GraphState(); - - /** What kinds of functions to run on this mapper */ - public enum MapFunctions { - UNKNOWN, - MASTER_ONLY, - MASTER_ZOOKEEPER_ONLY, - WORKER_ONLY, - ALL, - ALL_EXCEPT_ZOOKEEPER - } - - /** - * Get the map function enum - */ - public MapFunctions getMapFunctions() { - return mapFunctions; - } - - /** - * Get the aggregator usage, a subset of the functionality - * - * @return Aggregator usage interface - */ - public final AggregatorUsage getAggregatorUsage() { - return serviceWorker; - } - - public final WorkerContext getWorkerContext() { - return serviceWorker.getWorkerContext(); - } - - public final GraphState getGraphState() { - return graphState; - } - - /** - * Default handler for uncaught exceptions. - */ - class OverrideExceptionHandler - implements Thread.UncaughtExceptionHandler { - public void uncaughtException(Thread t, Throwable e) { - LOG.fatal( - "uncaughtException: OverrideExceptionHandler on thread " + - t.getName() + ", msg = " + e.getMessage() + - ", exiting...", e); - System.exit(1); - } - } - - /** - * Copied from JobConf to get the location of this jar. Workaround for - * things like Oozie map-reduce jobs. - * - * @param my_class Class to search the class loader path for to locate - * the relevant jar file - * @return Location of the jar file containing my_class - */ - private static String findContainingJar(Class my_class) { - ClassLoader loader = my_class.getClassLoader(); - String class_file = - my_class.getName().replaceAll("\\.", "/") + ".class"; - try { - for(Enumeration itr = loader.getResources(class_file); - itr.hasMoreElements();) { - URL url = (URL) itr.nextElement(); - if ("jar".equals(url.getProtocol())) { - String toReturn = url.getPath(); - if (toReturn.startsWith("file:")) { - toReturn = toReturn.substring("file:".length()); - } - toReturn = URLDecoder.decode(toReturn, "UTF-8"); - return toReturn.replaceAll("!.*$", ""); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - } - - /** - * Make sure that all registered classes have matching types. This - * is a little tricky due to type erasure, cannot simply get them from - * the class type arguments. Also, set the vertex index, vertex value, - * edge value and message value classes. - * - * @param conf Configuration to get the various classes - */ - public void determineClassTypes(Configuration conf) { - Class> vertexClass = - BspUtils.getVertexClass(conf); - List> classList = ReflectionUtils.getTypeArguments( - BasicVertex.class, vertexClass); - Type vertexIndexType = classList.get(0); - Type vertexValueType = classList.get(1); - Type edgeValueType = classList.get(2); - Type messageValueType = classList.get(3); - - Class> vertexInputFormatClass = - BspUtils.getVertexInputFormatClass(conf); - classList = ReflectionUtils.getTypeArguments( - VertexInputFormat.class, vertexInputFormatClass); - if (classList.get(0) == null) { - LOG.warn("Input format vertex index type is not known"); - } else if (!vertexIndexType.equals(classList.get(0))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex index types don't match, " + + E extends Writable, M extends Writable> extends + Mapper { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(GraphMapper.class); + /** Coordination service worker */ + private CentralizedServiceWorker serviceWorker; + /** Coordination service master thread */ + private Thread masterThread = null; + /** The map should be run exactly once, or else there is a problem. */ + private boolean mapAlreadyRun = false; + /** Manages the ZooKeeper servers if necessary (dynamic startup) */ + private ZooKeeperManager zkManager; + /** Configuration */ + private Configuration conf; + /** Already complete? */ + private boolean done = false; + /** What kind of functions is this mapper doing? */ + private MapFunctions mapFunctions = MapFunctions.UNKNOWN; + /** + * Graph state for all vertices that is used for the duration of + * this mapper. + */ + private GraphState graphState = new GraphState(); + + /** What kinds of functions to run on this mapper */ + public enum MapFunctions { + /** Undecided yet */ + UNKNOWN, + /** Only be the master */ + MASTER_ONLY, + /** Only be the master and ZooKeeper */ + MASTER_ZOOKEEPER_ONLY, + /** Only be the worker */ + WORKER_ONLY, + /** Do master, worker, and ZooKeeper */ + ALL, + /** Do master and worker */ + ALL_EXCEPT_ZOOKEEPER + } + + /** + * Get the map function enum. + * + * @return Map functions of this mapper. + */ + public MapFunctions getMapFunctions() { + return mapFunctions; + } + + /** + * Get the aggregator usage, a subset of the functionality + * + * @return Aggregator usage interface + */ + public final AggregatorUsage getAggregatorUsage() { + return serviceWorker; + } + + public final WorkerContext getWorkerContext() { + return serviceWorker.getWorkerContext(); + } + + public final GraphState getGraphState() { + return graphState; + } + + /** + * Default handler for uncaught exceptions. + */ + class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.fatal( + "uncaughtException: OverrideExceptionHandler on thread " + + t.getName() + ", msg = " + e.getMessage() + + ", exiting...", e); + System.exit(1); + } + } + + /** + * Copied from JobConf to get the location of this jar. Workaround for + * things like Oozie map-reduce jobs. + * + * @param myClass Class to search the class loader path for to locate + * the relevant jar file + * @return Location of the jar file containing myClass + */ + private static String findContainingJar(Class myClass) { + ClassLoader loader = myClass.getClassLoader(); + String classFile = + myClass.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(classFile); + itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + /** + * Make sure that all registered classes have matching types. This + * is a little tricky due to type erasure, cannot simply get them from + * the class type arguments. Also, set the vertex index, vertex value, + * edge value and message value classes. + * + * @param conf Configuration to get the various classes + */ + public void determineClassTypes(Configuration conf) { + Class> vertexClass = + BspUtils.getVertexClass(conf); + List> classList = ReflectionUtils.getTypeArguments( + BasicVertex.class, vertexClass); + Type vertexIndexType = classList.get(0); + Type vertexValueType = classList.get(1); + Type edgeValueType = classList.get(2); + Type messageValueType = classList.get(3); + + Class> vertexInputFormatClass = + BspUtils.getVertexInputFormatClass(conf); + classList = ReflectionUtils.getTypeArguments( + VertexInputFormat.class, vertexInputFormatClass); + if (classList.get(0) == null) { + LOG.warn("Input format vertex index type is not known"); + } else if (!vertexIndexType.equals(classList.get(0))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + + "vertex - " + vertexIndexType + + ", vertex input format - " + classList.get(0)); + } + if (classList.get(1) == null) { + LOG.warn("Input format vertex value type is not known"); + } else if (!vertexValueType.equals(classList.get(1))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex value types don't match, " + + "vertex - " + vertexValueType + + ", vertex input format - " + classList.get(1)); + } + if (classList.get(2) == null) { + LOG.warn("Input format edge value type is not known"); + } else if (!edgeValueType.equals(classList.get(2))) { + throw new IllegalArgumentException( + "checkClassTypes: Edge value types don't match, " + + "vertex - " + edgeValueType + + ", vertex input format - " + classList.get(2)); + } + // If has vertex combiner class, check + Class> vertexCombinerClass = + BspUtils.getVertexCombinerClass(conf); + if (vertexCombinerClass != null) { + classList = ReflectionUtils.getTypeArguments( + VertexCombiner.class, vertexCombinerClass); + if (!vertexIndexType.equals(classList.get(0))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + "vertex - " + vertexIndexType + - ", vertex input format - " + classList.get(0)); - } - if (classList.get(1) == null) { - LOG.warn("Input format vertex value type is not known"); - } else if (!vertexValueType.equals(classList.get(1))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex value types don't match, " + + ", vertex combiner - " + classList.get(0)); + } + if (!messageValueType.equals(classList.get(1))) { + throw new IllegalArgumentException( + "checkClassTypes: Message value types don't match, " + "vertex - " + vertexValueType + - ", vertex input format - " + classList.get(1)); - } - if (classList.get(2) == null) { - LOG.warn("Input format edge value type is not known"); - } else if (!edgeValueType.equals(classList.get(2))) { - throw new IllegalArgumentException( - "checkClassTypes: Edge value types don't match, " + - "vertex - " + edgeValueType + - ", vertex input format - " + classList.get(2)); - } - // If has vertex combiner class, check - Class> vertexCombinerClass = - BspUtils.getVertexCombinerClass(conf); - if (vertexCombinerClass != null) { - classList = ReflectionUtils.getTypeArguments( - VertexCombiner.class, vertexCombinerClass); - if (!vertexIndexType.equals(classList.get(0))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex index types don't match, " + - "vertex - " + vertexIndexType + - ", vertex combiner - " + classList.get(0)); - } - if (!messageValueType.equals(classList.get(1))) { - throw new IllegalArgumentException( - "checkClassTypes: Message value types don't match, " + - "vertex - " + vertexValueType + - ", vertex combiner - " + classList.get(1)); - } - } - // If has vertex output format class, check - Class> - vertexOutputFormatClass = - BspUtils.getVertexOutputFormatClass(conf); - if (vertexOutputFormatClass != null) { - classList = - ReflectionUtils.getTypeArguments( - VertexOutputFormat.class, vertexOutputFormatClass); - if (classList.get(0) == null) { - LOG.warn("Output format vertex index type is not known"); - } else if (!vertexIndexType.equals(classList.get(0))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex index types don't match, " + - "vertex - " + vertexIndexType + - ", vertex output format - " + classList.get(0)); - } - if (classList.get(1) == null) { - LOG.warn("Output format vertex value type is not known"); - } else if (!vertexValueType.equals(classList.get(1))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex value types don't match, " + - "vertex - " + vertexValueType + - ", vertex output format - " + classList.get(1)); - } if (classList.get(2) == null) { - LOG.warn("Output format edge value type is not known"); - } else if (!edgeValueType.equals(classList.get(2))) { - throw new IllegalArgumentException( - "checkClassTypes: Edge value types don't match, " + - "vertex - " + vertexIndexType + - ", vertex output format - " + classList.get(2)); - } - } - // Vertex resolver might never select the types - Class> - vertexResolverClass = - BspUtils.getVertexResolverClass(conf); - classList = ReflectionUtils.getTypeArguments( - VertexResolver.class, vertexResolverClass); - if (classList.get(0) != null && - !vertexIndexType.equals(classList.get(0))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex index types don't match, " + + ", vertex combiner - " + classList.get(1)); + } + } + // If has vertex output format class, check + Class> + vertexOutputFormatClass = + BspUtils.getVertexOutputFormatClass(conf); + if (vertexOutputFormatClass != null) { + classList = + ReflectionUtils.getTypeArguments( + VertexOutputFormat.class, vertexOutputFormatClass); + if (classList.get(0) == null) { + LOG.warn("Output format vertex index type is not known"); + } else if (!vertexIndexType.equals(classList.get(0))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + "vertex - " + vertexIndexType + - ", vertex resolver - " + classList.get(0)); - } - if (classList.get(1) != null && - !vertexValueType.equals(classList.get(1))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex value types don't match, " + + ", vertex output format - " + classList.get(0)); + } + if (classList.get(1) == null) { + LOG.warn("Output format vertex value type is not known"); + } else if (!vertexValueType.equals(classList.get(1))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex value types don't match, " + "vertex - " + vertexValueType + - ", vertex resolver - " + classList.get(1)); - } - if (classList.get(2) != null && - !edgeValueType.equals(classList.get(2))) { - throw new IllegalArgumentException( - "checkClassTypes: Edge value types don't match, " + - "vertex - " + edgeValueType + - ", vertex resolver - " + classList.get(2)); - } - if (classList.get(3) != null && - !messageValueType.equals(classList.get(3))) { - throw new IllegalArgumentException( - "checkClassTypes: Message value types don't match, " + - "vertex - " + edgeValueType + - ", vertex resolver - " + classList.get(3)); - } - conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, - (Class) vertexIndexType, - WritableComparable.class); - conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, - (Class) vertexValueType, - Writable.class); - conf.setClass(GiraphJob.EDGE_VALUE_CLASS, - (Class) edgeValueType, - Writable.class); - conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, - (Class) messageValueType, - Writable.class); - } - - /** - * Figure out what functions this mapper should do. Basic logic is as - * follows: - * 1) If not split master, everyone does the everything and/or running - * ZooKeeper. - * 2) If split master/worker, masters also run ZooKeeper (if it's not - * given to us). - * - * @param conf Configuration to use - * @return Functions that this mapper should do. - */ - private static MapFunctions determineMapFunctions( - Configuration conf, - ZooKeeperManager zkManager) { - boolean splitMasterWorker = - conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER, - GiraphJob.SPLIT_MASTER_WORKER_DEFAULT); - int taskPartition = conf.getInt("mapred.task.partition", -1); - boolean zkAlreadyProvided = - conf.get(GiraphJob.ZOOKEEPER_LIST) != null; - MapFunctions functions = MapFunctions.UNKNOWN; - // What functions should this mapper do? - if (!splitMasterWorker) { - if ((zkManager != null) && zkManager.runsZooKeeper()) { - functions = MapFunctions.ALL; - } else { - functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER; - } + ", vertex output format - " + classList.get(1)); + } + if (classList.get(2) == null) { + LOG.warn("Output format edge value type is not known"); + } else if (!edgeValueType.equals(classList.get(2))) { + throw new IllegalArgumentException( + "checkClassTypes: Edge value types don't match, " + + "vertex - " + vertexIndexType + + ", vertex output format - " + classList.get(2)); + } + } + // Vertex resolver might never select the types + Class> + vertexResolverClass = + BspUtils.getVertexResolverClass(conf); + classList = ReflectionUtils.getTypeArguments( + VertexResolver.class, vertexResolverClass); + if (classList.get(0) != null && + !vertexIndexType.equals(classList.get(0))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + + "vertex - " + vertexIndexType + + ", vertex resolver - " + classList.get(0)); + } + if (classList.get(1) != null && + !vertexValueType.equals(classList.get(1))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex value types don't match, " + + "vertex - " + vertexValueType + + ", vertex resolver - " + classList.get(1)); + } + if (classList.get(2) != null && + !edgeValueType.equals(classList.get(2))) { + throw new IllegalArgumentException( + "checkClassTypes: Edge value types don't match, " + + "vertex - " + edgeValueType + + ", vertex resolver - " + classList.get(2)); + } + if (classList.get(3) != null && + !messageValueType.equals(classList.get(3))) { + throw new IllegalArgumentException( + "checkClassTypes: Message value types don't match, " + + "vertex - " + edgeValueType + + ", vertex resolver - " + classList.get(3)); + } + conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, + (Class) vertexIndexType, + WritableComparable.class); + conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, + (Class) vertexValueType, + Writable.class); + conf.setClass(GiraphJob.EDGE_VALUE_CLASS, + (Class) edgeValueType, + Writable.class); + conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, + (Class) messageValueType, + Writable.class); + } + + /** + * Figure out what functions this mapper should do. Basic logic is as + * follows: + * 1) If not split master, everyone does the everything and/or running + * ZooKeeper. + * 2) If split master/worker, masters also run ZooKeeper (if it's not + * given to us). + * + * @param conf Configuration to use + * @param zkManager ZooKeeper manager to help determine whether to run + * ZooKeeper + * @return Functions that this mapper should do. + */ + private static MapFunctions determineMapFunctions( + Configuration conf, + ZooKeeperManager zkManager) { + boolean splitMasterWorker = + conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER, + GiraphJob.SPLIT_MASTER_WORKER_DEFAULT); + int taskPartition = conf.getInt("mapred.task.partition", -1); + boolean zkAlreadyProvided = + conf.get(GiraphJob.ZOOKEEPER_LIST) != null; + MapFunctions functions = MapFunctions.UNKNOWN; + // What functions should this mapper do? + if (!splitMasterWorker) { + if ((zkManager != null) && zkManager.runsZooKeeper()) { + functions = MapFunctions.ALL; + } else { + functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER; + } + } else { + if (zkAlreadyProvided) { + int masterCount = + conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT, + GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT); + if (taskPartition < masterCount) { + functions = MapFunctions.MASTER_ONLY; } else { - if (zkAlreadyProvided) { - int masterCount = - conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT, - GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT); - if (taskPartition < masterCount) { - functions = MapFunctions.MASTER_ONLY; - } else { - functions = MapFunctions.WORKER_ONLY; - } - } else { - if ((zkManager != null) && zkManager.runsZooKeeper()) { - functions = MapFunctions.MASTER_ZOOKEEPER_ONLY; - } else { - functions = MapFunctions.WORKER_ONLY; - } - } + functions = MapFunctions.WORKER_ONLY; } - return functions; - } - - @Override - public void setup(Context context) - throws IOException, InterruptedException { - context.setStatus("setup: Beginning mapper setup."); - graphState.setContext(context); - // Setting the default handler for uncaught exceptions. - Thread.setDefaultUncaughtExceptionHandler( - new OverrideExceptionHandler()); - conf = context.getConfiguration(); - // Hadoop security needs this property to be set - if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { - conf.set("mapreduce.job.credentials.binary", - System.getenv("HADOOP_TOKEN_FILE_LOCATION")); - } - // Ensure the user classes have matching types and figure them out - determineClassTypes(conf); - - // Do some initial setup (possibly starting up a Zookeeper service) - context.setStatus("setup: Initializing Zookeeper services."); - if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE, - GiraphJob.LOCAL_TEST_MODE_DEFAULT)) { - Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf); - String zkClasspath = null; - if(fileClassPaths == null) { - if(LOG.isInfoEnabled()) { - LOG.info("Distributed cache is empty. Assuming fatjar."); - } - String jarFile = context.getJar(); - if (jarFile == null) { - jarFile = findContainingJar(getClass()); - } - zkClasspath = jarFile.replaceFirst("file:", ""); - } else { - StringBuilder sb = new StringBuilder(); - sb.append(fileClassPaths[0]); - - for (int i = 1; i < fileClassPaths.length; i++) { - sb.append(":"); - sb.append(fileClassPaths[i]); - } - zkClasspath = sb.toString(); - } - - if (LOG.isInfoEnabled()) { - LOG.info("setup: classpath @ " + zkClasspath); - } - conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath); - } - String serverPortList = - conf.get(GiraphJob.ZOOKEEPER_LIST, ""); - if (serverPortList == "") { - zkManager = new ZooKeeperManager(context); - context.setStatus("setup: Setting up Zookeeper manager."); - zkManager.setup(); - if (zkManager.computationDone()) { - done = true; - return; - } - zkManager.onlineZooKeeperServers(); - serverPortList = zkManager.getZooKeeperServerPortString(); - } - context.setStatus("setup: Connected to Zookeeper service " + - serverPortList); - this.mapFunctions = determineMapFunctions(conf, zkManager); - - // Sometimes it takes a while to get multiple ZooKeeper servers up - if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT, - GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) { - Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT * - GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME); - } - int sessionMsecTimeout = - conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT, - GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); - try { - if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || - (mapFunctions == MapFunctions.MASTER_ONLY) || - (mapFunctions == MapFunctions.ALL) || - (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { - if (LOG.isInfoEnabled()) { - LOG.info("setup: Starting up BspServiceMaster " + - "(master thread)..."); - } - masterThread = - new MasterThread( - new BspServiceMaster(serverPortList, - sessionMsecTimeout, - context, - this), - context); - masterThread.start(); - } - if ((mapFunctions == MapFunctions.WORKER_ONLY) || - (mapFunctions == MapFunctions.ALL) || - (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { - if (LOG.isInfoEnabled()) { - LOG.info("setup: Starting up BspServiceWorker..."); - } - serviceWorker = new BspServiceWorker( - serverPortList, - sessionMsecTimeout, - context, - this, - graphState); - if (LOG.isInfoEnabled()) { - LOG.info("setup: Registering health of this worker..."); - } - serviceWorker.setup(); - } - } catch (Exception e) { - LOG.error("setup: Caught exception just before end of setup", e); - if (zkManager != null ) { - zkManager.offlineZooKeeperServers( - ZooKeeperManager.State.FAILED); - } - throw new RuntimeException( - "setup: Offlining servers due to exception...", e); + } else { + if ((zkManager != null) && zkManager.runsZooKeeper()) { + functions = MapFunctions.MASTER_ZOOKEEPER_ONLY; + } else { + functions = MapFunctions.WORKER_ONLY; } - context.setStatus(getMapFunctions().toString() + " starting..."); + } } + return functions; + } - @Override - public void map(Object key, Object value, Context context) - throws IOException, InterruptedException { - // map() only does computation - // 1) Run checkpoint per frequency policy. - // 2) For every vertex on this mapper, run the compute() function - // 3) Wait until all messaging is done. - // 4) Check if all vertices are done. If not goto 2). - // 5) Dump output. - if (done == true) { - return; - } - if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) { - return; - } - - if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || - (mapFunctions == MapFunctions.MASTER_ONLY)) { - if (LOG.isInfoEnabled()) { - LOG.info("map: No need to do anything when not a worker"); - } - return; - } - - if (mapAlreadyRun) { - throw new RuntimeException("In BSP, map should have only been" + - " run exactly once, (already run)"); - } - mapAlreadyRun = true; - - graphState.setSuperstep(serviceWorker.getSuperstep()). - setContext(context).setGraphMapper(this); - - try { - serviceWorker.getWorkerContext().preApplication(); - } catch (InstantiationException e) { - LOG.fatal("map: preApplication failed in instantiation", e); - throw new RuntimeException( - "map: preApplication failed in instantiation", e); - } catch (IllegalAccessException e) { - LOG.fatal("map: preApplication failed in access", e); - throw new RuntimeException( - "map: preApplication failed in access",e ); - } - context.progress(); - - List partitionStatsList = - new ArrayList(); - do { - long superstep = serviceWorker.getSuperstep(); - - graphState.setSuperstep(superstep); - - Collection masterAssignedPartitionOwners = - serviceWorker.startSuperstep(); - if (zkManager != null && zkManager.runsZooKeeper()) { - if (LOG.isInfoEnabled()) { - LOG.info("map: Chosen to run ZooKeeper..."); - } - context.setStatus("map: Running Zookeeper Server"); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats()); - } - context.progress(); - - serviceWorker.exchangeVertexPartitions( - masterAssignedPartitionOwners); - context.progress(); - - // Might need to restart from another superstep - // (manually or automatic), or store a checkpoint - if (serviceWorker.getRestartedSuperstep() == superstep) { - if (LOG.isInfoEnabled()) { - LOG.info("map: Loading from checkpoint " + superstep); - } - serviceWorker.loadCheckpoint( - serviceWorker.getRestartedSuperstep()); - } else if (serviceWorker.checkpointFrequencyMet(superstep)) { - serviceWorker.storeCheckpoint(); - } - - serviceWorker.getWorkerContext().setGraphState(graphState); - serviceWorker.getWorkerContext().preSuperstep(); - context.progress(); - - partitionStatsList.clear(); - for (Partition partition : - serviceWorker.getPartitionMap().values()) { - PartitionStats partitionStats = - new PartitionStats(partition.getPartitionId(), 0, 0, 0); - for (BasicVertex basicVertex : - partition.getVertices()) { - // Make sure every vertex has the current - // graphState before computing - basicVertex.setGraphState(graphState); - if (basicVertex.isHalted() - && !Iterables.isEmpty(basicVertex.getMessages())) { - basicVertex.halt = false; - } - if (!basicVertex.isHalted()) { - Iterator vertexMsgIt = - basicVertex.getMessages().iterator(); - context.progress(); - basicVertex.compute(vertexMsgIt); - basicVertex.releaseResources(); - } - if (basicVertex.isHalted()) { - partitionStats.incrFinishedVertexCount(); - } - partitionStats.incrVertexCount(); - partitionStats.addEdgeCount(basicVertex.getNumOutEdges()); - } - partitionStatsList.add(partitionStats); - } - } while (!serviceWorker.finishSuperstep(partitionStatsList)); + @Override + public void setup(Context context) + throws IOException, InterruptedException { + context.setStatus("setup: Beginning mapper setup."); + graphState.setContext(context); + // Setting the default handler for uncaught exceptions. + Thread.setDefaultUncaughtExceptionHandler( + new OverrideExceptionHandler()); + conf = context.getConfiguration(); + // Hadoop security needs this property to be set + if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { + conf.set("mapreduce.job.credentials.binary", + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + // Ensure the user classes have matching types and figure them out + determineClassTypes(conf); + + // Do some initial setup (possibly starting up a Zookeeper service) + context.setStatus("setup: Initializing Zookeeper services."); + if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE, + GiraphJob.LOCAL_TEST_MODE_DEFAULT)) { + Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf); + String zkClasspath = null; + if (fileClassPaths == null) { if (LOG.isInfoEnabled()) { - LOG.info("map: BSP application done " + - "(global vertices marked done)"); + LOG.info("Distributed cache is empty. Assuming fatjar."); } - - serviceWorker.getWorkerContext().postApplication(); - context.progress(); - } - - @Override - public void cleanup(Context context) - throws IOException, InterruptedException { + String jarFile = context.getJar(); + if (jarFile == null) { + jarFile = findContainingJar(getClass()); + } + zkClasspath = jarFile.replaceFirst("file:", ""); + } else { + StringBuilder sb = new StringBuilder(); + sb.append(fileClassPaths[0]); + + for (int i = 1; i < fileClassPaths.length; i++) { + sb.append(":"); + sb.append(fileClassPaths[i]); + } + zkClasspath = sb.toString(); + } + + if (LOG.isInfoEnabled()) { + LOG.info("setup: classpath @ " + zkClasspath); + } + conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath); + } + String serverPortList = + conf.get(GiraphJob.ZOOKEEPER_LIST, ""); + if (serverPortList.isEmpty()) { + zkManager = new ZooKeeperManager(context); + context.setStatus("setup: Setting up Zookeeper manager."); + zkManager.setup(); + if (zkManager.computationDone()) { + done = true; + return; + } + zkManager.onlineZooKeeperServers(); + serverPortList = zkManager.getZooKeeperServerPortString(); + } + context.setStatus("setup: Connected to Zookeeper service " + + serverPortList); + this.mapFunctions = determineMapFunctions(conf, zkManager); + + // Sometimes it takes a while to get multiple ZooKeeper servers up + if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT, + GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) { + Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT * + GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME); + } + int sessionMsecTimeout = + conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT, + GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); + try { + if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || + (mapFunctions == MapFunctions.MASTER_ONLY) || + (mapFunctions == MapFunctions.ALL) || + (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { if (LOG.isInfoEnabled()) { - LOG.info("cleanup: Starting for " + getMapFunctions()); + LOG.info("setup: Starting up BspServiceMaster " + + "(master thread)..."); } - if (done) { - return; + masterThread = + new MasterThread( + new BspServiceMaster(serverPortList, + sessionMsecTimeout, + context, + this), + context); + masterThread.start(); + } + if ((mapFunctions == MapFunctions.WORKER_ONLY) || + (mapFunctions == MapFunctions.ALL) || + (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { + if (LOG.isInfoEnabled()) { + LOG.info("setup: Starting up BspServiceWorker..."); } - - if (serviceWorker != null) { - serviceWorker.cleanup(); + serviceWorker = new BspServiceWorker( + serverPortList, + sessionMsecTimeout, + context, + this, + graphState); + if (LOG.isInfoEnabled()) { + LOG.info("setup: Registering health of this worker..."); } - try { - if (masterThread != null) { - masterThread.join(); - } - } catch (InterruptedException e) { - // cleanup phase -- just log the error - LOG.error("cleanup: Master thread couldn't join"); - } - if (zkManager != null) { - zkManager.offlineZooKeeperServers( - ZooKeeperManager.State.FINISHED); + serviceWorker.setup(); + } + } catch (IOException e) { + LOG.error("setup: Caught exception just before end of setup", e); + if (zkManager != null) { + zkManager.offlineZooKeeperServers( + ZooKeeperManager.State.FAILED); + } + throw new RuntimeException( + "setup: Offlining servers due to exception...", e); + } + context.setStatus(getMapFunctions().toString() + " starting..."); + } + + @Override + public void map(Object key, Object value, Context context) + throws IOException, InterruptedException { + // map() only does computation + // 1) Run checkpoint per frequency policy. + // 2) For every vertex on this mapper, run the compute() function + // 3) Wait until all messaging is done. + // 4) Check if all vertices are done. If not goto 2). + // 5) Dump output. + if (done) { + return; + } + if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) { + return; + } + + if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) || + (mapFunctions == MapFunctions.MASTER_ONLY)) { + if (LOG.isInfoEnabled()) { + LOG.info("map: No need to do anything when not a worker"); + } + return; + } + + if (mapAlreadyRun) { + throw new RuntimeException("In BSP, map should have only been" + + " run exactly once, (already run)"); + } + mapAlreadyRun = true; + + graphState.setSuperstep(serviceWorker.getSuperstep()). + setContext(context).setGraphMapper(this); + + try { + serviceWorker.getWorkerContext().preApplication(); + } catch (InstantiationException e) { + LOG.fatal("map: preApplication failed in instantiation", e); + throw new RuntimeException( + "map: preApplication failed in instantiation", e); + } catch (IllegalAccessException e) { + LOG.fatal("map: preApplication failed in access", e); + throw new RuntimeException( + "map: preApplication failed in access", e); + } + context.progress(); + + List partitionStatsList = + new ArrayList(); + do { + long superstep = serviceWorker.getSuperstep(); + + graphState.setSuperstep(superstep); + + Collection masterAssignedPartitionOwners = + serviceWorker.startSuperstep(); + if (zkManager != null && zkManager.runsZooKeeper()) { + if (LOG.isInfoEnabled()) { + LOG.info("map: Chosen to run ZooKeeper..."); } - } + context.setStatus("map: Running Zookeeper Server"); + } - @Override - public void run(Context context) throws IOException, InterruptedException { - // Notify the master quicker if there is worker failure rather than - // waiting for ZooKeeper to timeout and delete the ephemeral znodes - try { - setup(context); - while (context.nextKeyValue()) { - map(context.getCurrentKey(), - context.getCurrentValue(), - context); - } - cleanup(context); - } catch (Exception e) { - if (mapFunctions == MapFunctions.WORKER_ONLY) { - serviceWorker.failureCleanup(); - } - throw new IllegalStateException( - "run: Caught an unrecoverable exception " + e.getMessage(), e); + if (LOG.isDebugEnabled()) { + LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats()); + } + context.progress(); + + serviceWorker.exchangeVertexPartitions( + masterAssignedPartitionOwners); + context.progress(); + + // Might need to restart from another superstep + // (manually or automatic), or store a checkpoint + if (serviceWorker.getRestartedSuperstep() == superstep) { + if (LOG.isInfoEnabled()) { + LOG.info("map: Loading from checkpoint " + superstep); } + serviceWorker.loadCheckpoint( + serviceWorker.getRestartedSuperstep()); + } else if (serviceWorker.checkpointFrequencyMet(superstep)) { + serviceWorker.storeCheckpoint(); + } + + serviceWorker.getWorkerContext().setGraphState(graphState); + serviceWorker.getWorkerContext().preSuperstep(); + context.progress(); + + partitionStatsList.clear(); + for (Partition partition : + serviceWorker.getPartitionMap().values()) { + PartitionStats partitionStats = + new PartitionStats(partition.getPartitionId(), 0, 0, 0); + for (BasicVertex basicVertex : + partition.getVertices()) { + // Make sure every vertex has the current + // graphState before computing + basicVertex.setGraphState(graphState); + if (basicVertex.isHalted() & + !Iterables.isEmpty(basicVertex.getMessages())) { + basicVertex.halt = false; + } + if (!basicVertex.isHalted()) { + Iterator vertexMsgIt = + basicVertex.getMessages().iterator(); + context.progress(); + basicVertex.compute(vertexMsgIt); + basicVertex.releaseResources(); + } + if (basicVertex.isHalted()) { + partitionStats.incrFinishedVertexCount(); + } + partitionStats.incrVertexCount(); + partitionStats.addEdgeCount(basicVertex.getNumOutEdges()); + } + partitionStatsList.add(partitionStats); + } + } while (!serviceWorker.finishSuperstep(partitionStatsList)); + if (LOG.isInfoEnabled()) { + LOG.info("map: BSP application done " + + "(global vertices marked done)"); + } + + serviceWorker.getWorkerContext().postApplication(); + context.progress(); + } + + @Override + public void cleanup(Context context) + throws IOException, InterruptedException { + if (LOG.isInfoEnabled()) { + LOG.info("cleanup: Starting for " + getMapFunctions()); + } + if (done) { + return; + } + + if (serviceWorker != null) { + serviceWorker.cleanup(); + } + try { + if (masterThread != null) { + masterThread.join(); + } + } catch (InterruptedException e) { + // cleanup phase -- just log the error + LOG.error("cleanup: Master thread couldn't join"); + } + if (zkManager != null) { + zkManager.offlineZooKeeperServers( + ZooKeeperManager.State.FINISHED); + } + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + // Notify the master quicker if there is worker failure rather than + // waiting for ZooKeeper to timeout and delete the ephemeral znodes + try { + setup(context); + while (context.nextKeyValue()) { + map(context.getCurrentKey(), + context.getCurrentValue(), + context); + } + cleanup(context); + } catch (IOException e) { + if (mapFunctions == MapFunctions.WORKER_ONLY) { + serviceWorker.failureCleanup(); + } + throw new IllegalStateException( + "run: Caught an unrecoverable exception " + e.getMessage(), e); } + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Thu Feb 16 22:12:31 2012 @@ -22,84 +22,120 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -/* +/** * Global state of the graph. Should be treated as a singleton (but is kept * as a regular bean to facilitate ease of unit testing) * - * @param vertex id - * @param vertex data - * @param edge data - * @param message data + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data */ @SuppressWarnings("rawtypes") public class GraphState { - /** Graph-wide superstep */ - private long superstep = 0; - /** Graph-wide number of vertices */ - private long numVertices = -1; - /** Graph-wide number of edges */ - private long numEdges = -1; - /** Graph-wide map context */ - private Mapper.Context context; - /** Graph-wide BSP Mapper for this Vertex */ - private GraphMapper graphMapper; - /** Graph-wide worker communications */ - private WorkerCommunications workerCommunications; - - public long getSuperstep() { - return superstep; - } - - public GraphState setSuperstep(long superstep) { - this.superstep = superstep; - return this; - } - - public long getNumVertices() { - return numVertices; - } - - public GraphState setNumVertices(long numVertices) { - this.numVertices = numVertices; - return this; - } - - public long getNumEdges() { - return numEdges; - } - - public GraphState setNumEdges(long numEdges) { - this.numEdges = numEdges; - return this; - } - - public Mapper.Context getContext() { - return context; - } - - public GraphState setContext(Mapper.Context context) { - this.context = context; - return this; - } - - public GraphMapper getGraphMapper() { - return graphMapper; - } - - public GraphState setGraphMapper( - GraphMapper graphMapper) { - this.graphMapper = graphMapper; - return this; - } - - public GraphState setWorkerCommunications( - WorkerCommunications workerCommunications) { - this.workerCommunications = workerCommunications; - return this; - } - - public WorkerCommunications getWorkerCommunications() { - return workerCommunications; - } +E extends Writable, M extends Writable> { + /** Graph-wide superstep */ + private long superstep = 0; + /** Graph-wide number of vertices */ + private long numVertices = -1; + /** Graph-wide number of edges */ + private long numEdges = -1; + /** Graph-wide map context */ + private Mapper.Context context; + /** Graph-wide BSP Mapper for this Vertex */ + private GraphMapper graphMapper; + /** Graph-wide worker communications */ + private WorkerCommunications workerCommunications; + + public long getSuperstep() { + return superstep; + } + + /** + * Set the current superstep. + * + * @param superstep Current superstep to use. + * @return Returns this object. + */ + public GraphState setSuperstep(long superstep) { + this.superstep = superstep; + return this; + } + + public long getNumVertices() { + return numVertices; + } + + /** + * Set the current number of vertices. + * + * @param numVertices Current number of vertices. + * @return Returns this object. + */ + public GraphState setNumVertices(long numVertices) { + this.numVertices = numVertices; + return this; + } + + public long getNumEdges() { + return numEdges; + } + + /** + * Set the current number of edges. + * + * @param numEdges Current number of edges. + * @return Returns this object. + */ + public GraphState setNumEdges(long numEdges) { + this.numEdges = numEdges; + return this; + } + + public Mapper.Context getContext() { + return context; + } + + /** + * Set the current context. + * + * @param context Current context. + * @return Returns this object. + */ + public GraphState setContext(Mapper.Context context) { + this.context = context; + return this; + } + + public GraphMapper getGraphMapper() { + return graphMapper; + } + + /** + * Set the current graph mapper. + * + * @param graphMapper Current graph mapper. + * @return Returns this object. + */ + public GraphState setGraphMapper( + GraphMapper graphMapper) { + this.graphMapper = graphMapper; + return this; + } + + /** + * Set the current worker communications. + * + * @param workerCommunications Current worker communications. + * @return Returns this object. + */ + public GraphState setWorkerCommunications( + WorkerCommunications workerCommunications) { + this.workerCommunications = workerCommunications; + return this; + } + + public WorkerCommunications getWorkerCommunications() { + return workerCommunications; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Thu Feb 16 22:12:31 2012 @@ -48,196 +48,196 @@ import java.util.Map; */ @SuppressWarnings("rawtypes") public abstract class HashMapVertex - extends MutableVertex { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(HashMapVertex.class); - /** Vertex id */ - private I vertexId = null; - /** Vertex value */ - private V vertexValue = null; - /** Map of destination vertices and their edge values */ - protected final Map> destEdgeMap = - new HashMap>(); - /** List of incoming messages from the previous superstep */ - private final List msgList = Lists.newArrayList(); - - @Override - public void initialize( - I vertexId, V vertexValue, Map edges, Iterable messages) { - if (vertexId != null) { - setVertexId(vertexId); - } - if (vertexValue != null) { - setVertexValue(vertexValue); - } - if (edges != null && !edges.isEmpty()) { - for (Map.Entry entry : edges.entrySet()) { - destEdgeMap.put( - entry.getKey(), - new Edge(entry.getKey(), entry.getValue())); - } - } - if (messages != null) { - Iterables.addAll(msgList, messages); - } - } - - @Override - public final boolean addEdge(I targetVertexId, E edgeValue) { - if (destEdgeMap.put( - targetVertexId, - new Edge(targetVertexId, edgeValue)) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("addEdge: Vertex=" + vertexId + - ": already added an edge value for dest vertex id " + - targetVertexId); - } - return false; - } else { - return true; - } - } - - @Override - public long getSuperstep() { - return getGraphState().getSuperstep(); - } - - @Override - public final void setVertexId(I vertexId) { - this.vertexId = vertexId; - } - - @Override - public final I getVertexId() { - return vertexId; - } - - @Override - public final V getVertexValue() { - return vertexValue; - } - - @Override - public final void setVertexValue(V vertexValue) { - this.vertexValue = vertexValue; - } - - @Override - public E getEdgeValue(I targetVertexId) { - Edge edge = destEdgeMap.get(targetVertexId); - return edge != null ? edge.getEdgeValue() : null; - } - - @Override - public boolean hasEdge(I targetVertexId) { - return destEdgeMap.containsKey(targetVertexId); - } - - /** - * Get an iterator to the edges on this vertex. - * - * @return A sorted iterator, as defined by the sort-order - * of the vertex ids - */ - @Override - public Iterator iterator() { - return destEdgeMap.keySet().iterator(); - } - - @Override - public int getNumOutEdges() { - return destEdgeMap.size(); - } - - @Override - public E removeEdge(I targetVertexId) { - Edge edge = destEdgeMap.remove(targetVertexId); - if (edge != null) { - return edge.getEdgeValue(); - } else { - return null; - } - } - - @Override - public final void sendMsgToAllEdges(M msg) { - if (msg == null) { - throw new IllegalArgumentException( - "sendMsgToAllEdges: Cannot send null message to all edges"); - } - for (Edge edge : destEdgeMap.values()) { - sendMsg(edge.getDestVertexId(), msg); - } - } - - @Override - final public void readFields(DataInput in) throws IOException { - vertexId = BspUtils.createVertexIndex(getConf()); - vertexId.readFields(in); - boolean hasVertexValue = in.readBoolean(); - if (hasVertexValue) { - vertexValue = BspUtils.createVertexValue(getConf()); - vertexValue.readFields(in); - } - long edgeMapSize = in.readLong(); - for (long i = 0; i < edgeMapSize; ++i) { - Edge edge = new Edge(); - edge.setConf(getConf()); - edge.readFields(in); - addEdge(edge.getDestVertexId(), edge.getEdgeValue()); - } - long msgListSize = in.readLong(); - for (long i = 0; i < msgListSize; ++i) { - M msg = BspUtils.createMessageValue(getConf()); - msg.readFields(in); - msgList.add(msg); - } - halt = in.readBoolean(); - } - - @Override - final public void write(DataOutput out) throws IOException { - vertexId.write(out); - out.writeBoolean(vertexValue != null); - if (vertexValue != null) { - vertexValue.write(out); - } - out.writeLong(destEdgeMap.size()); - for (Edge edge : destEdgeMap.values()) { - edge.write(out); - } - out.writeLong(msgList.size()); - for (M msg : msgList) { - msg.write(out); - } - out.writeBoolean(halt); - } - - @Override - void putMessages(Iterable messages) { - msgList.clear(); - for (M message : messages) { - msgList.add(message); - } - } - - @Override - public Iterable getMessages() { - return Iterables.unmodifiableIterable(msgList); - } - - @Override - void releaseResources() { - // Hint to GC to free the messages - msgList.clear(); - } - - @Override - public String toString() { - return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + - ",#edges=" + destEdgeMap.size() + ")"; - } + V extends Writable, E extends Writable, M extends Writable> + extends MutableVertex { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(HashMapVertex.class); + /** Map of destination vertices and their edge values */ + protected final Map> destEdgeMap = + new HashMap>(); + /** Vertex id */ + private I vertexId = null; + /** Vertex value */ + private V vertexValue = null; + /** List of incoming messages from the previous superstep */ + private final List msgList = Lists.newArrayList(); + + @Override + public void initialize( + I vertexId, V vertexValue, Map edges, Iterable messages) { + if (vertexId != null) { + setVertexId(vertexId); + } + if (vertexValue != null) { + setVertexValue(vertexValue); + } + if (edges != null && !edges.isEmpty()) { + for (Map.Entry entry : edges.entrySet()) { + destEdgeMap.put( + entry.getKey(), + new Edge(entry.getKey(), entry.getValue())); + } + } + if (messages != null) { + Iterables.addAll(msgList, messages); + } + } + + @Override + public final boolean addEdge(I targetVertexId, E edgeValue) { + if (destEdgeMap.put( + targetVertexId, + new Edge(targetVertexId, edgeValue)) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("addEdge: Vertex=" + vertexId + + ": already added an edge value for dest vertex id " + + targetVertexId); + } + return false; + } else { + return true; + } + } + + @Override + public long getSuperstep() { + return getGraphState().getSuperstep(); + } + + @Override + public final void setVertexId(I vertexId) { + this.vertexId = vertexId; + } + + @Override + public final I getVertexId() { + return vertexId; + } + + @Override + public final V getVertexValue() { + return vertexValue; + } + + @Override + public final void setVertexValue(V vertexValue) { + this.vertexValue = vertexValue; + } + + @Override + public E getEdgeValue(I targetVertexId) { + Edge edge = destEdgeMap.get(targetVertexId); + return edge != null ? edge.getEdgeValue() : null; + } + + @Override + public boolean hasEdge(I targetVertexId) { + return destEdgeMap.containsKey(targetVertexId); + } + + /** + * Get an iterator to the edges on this vertex. + * + * @return A sorted iterator, as defined by the sort-order + * of the vertex ids + */ + @Override + public Iterator iterator() { + return destEdgeMap.keySet().iterator(); + } + + @Override + public int getNumOutEdges() { + return destEdgeMap.size(); + } + + @Override + public E removeEdge(I targetVertexId) { + Edge edge = destEdgeMap.remove(targetVertexId); + if (edge != null) { + return edge.getEdgeValue(); + } else { + return null; + } + } + + @Override + public final void sendMsgToAllEdges(M msg) { + if (msg == null) { + throw new IllegalArgumentException( + "sendMsgToAllEdges: Cannot send null message to all edges"); + } + for (Edge edge : destEdgeMap.values()) { + sendMsg(edge.getDestVertexId(), msg); + } + } + + @Override + public final void readFields(DataInput in) throws IOException { + vertexId = BspUtils.createVertexIndex(getConf()); + vertexId.readFields(in); + boolean hasVertexValue = in.readBoolean(); + if (hasVertexValue) { + vertexValue = BspUtils.createVertexValue(getConf()); + vertexValue.readFields(in); + } + long edgeMapSize = in.readLong(); + for (long i = 0; i < edgeMapSize; ++i) { + Edge edge = new Edge(); + edge.setConf(getConf()); + edge.readFields(in); + addEdge(edge.getDestVertexId(), edge.getEdgeValue()); + } + long msgListSize = in.readLong(); + for (long i = 0; i < msgListSize; ++i) { + M msg = BspUtils.createMessageValue(getConf()); + msg.readFields(in); + msgList.add(msg); + } + halt = in.readBoolean(); + } + + @Override + public final void write(DataOutput out) throws IOException { + vertexId.write(out); + out.writeBoolean(vertexValue != null); + if (vertexValue != null) { + vertexValue.write(out); + } + out.writeLong(destEdgeMap.size()); + for (Edge edge : destEdgeMap.values()) { + edge.write(out); + } + out.writeLong(msgList.size()); + for (M msg : msgList) { + msg.write(out); + } + out.writeBoolean(halt); + } + + @Override + void putMessages(Iterable messages) { + msgList.clear(); + for (M message : messages) { + msgList.add(message); + } + } + + @Override + public Iterable getMessages() { + return Iterables.unmodifiableIterable(msgList); + } + + @Override + void releaseResources() { + // Hint to GC to free the messages + msgList.clear(); + } + + @Override + public String toString() { + return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + + ",#edges=" + destEdgeMap.size() + ")"; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java Thu Feb 16 22:12:31 2012 @@ -30,135 +30,138 @@ import java.util.Iterator; import java.util.Map; /** - * Simple implementation of {@link BasicVertex} using an int as id, value and message. - * Edges are immutable and unweighted. This class aims to be as memory efficient as possible. + * Simple implementation of {@link BasicVertex} using an int as id, value and + * message. Edges are immutable and unweighted. This class aims to be as + * memory efficient as possible. */ public abstract class IntIntNullIntVertex extends - BasicVertex { - - private int id; - private int value; - - private int[] neighbors; - private int[] messages; - - @Override - public void initialize(IntWritable vertexId, IntWritable vertexValue, - Map edges, - Iterable messages) { - id = vertexId.get(); - value = vertexValue.get(); - this.neighbors = new int[edges.size()]; - int n = 0; - for (IntWritable neighbor : edges.keySet()) { - this.neighbors[n++] = neighbor.get(); - } - this.messages = new int[Iterables.size(messages)]; - n = 0; - for (IntWritable message : messages) { - this.messages[n++] = message.get(); - } - } - - @Override - public IntWritable getVertexId() { - return new IntWritable(id); - } - - @Override - public IntWritable getVertexValue() { - return new IntWritable(value); - } - - @Override - public void setVertexValue(IntWritable vertexValue) { - value = vertexValue.get(); - } - - @Override - public Iterator iterator() { - return new UnmodifiableIntArrayIterator(neighbors); - } - - @Override - public NullWritable getEdgeValue(IntWritable targetVertexId) { - return NullWritable.get(); - } - - @Override - public boolean hasEdge(IntWritable targetVertexId) { - for (int neighbor : neighbors) { - if (neighbor == targetVertexId.get()) { - return true; - } - } - return false; - } - - @Override - public int getNumOutEdges() { - return neighbors.length; - } - - @Override - public void sendMsgToAllEdges(final IntWritable message) { - for (int neighbor : neighbors) { - sendMsg(new IntWritable(neighbor), message); - } - } - - @Override - public Iterable getMessages() { - return new Iterable() { - @Override - public Iterator iterator() { - return new UnmodifiableIntArrayIterator(messages); - } - }; - } - - @Override - public void putMessages(Iterable newMessages) { - messages = new int[Iterables.size(newMessages)]; - int n = 0; - for (IntWritable message : newMessages) { - messages[n++] = message.get(); - } - } - - @Override - void releaseResources() { - messages = new int[0]; - } - - @Override - public void write(final DataOutput out) throws IOException { - out.writeInt(id); - out.writeInt(value); - out.writeInt(neighbors.length); - for (int n = 0; n < neighbors.length; n++) { - out.writeInt(neighbors[n]); - } - out.writeInt(messages.length); - for (int n = 0; n < messages.length; n++) { - out.writeInt(messages[n]); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - id = in.readInt(); - value = in.readInt(); - int numEdges = in.readInt(); - neighbors = new int[numEdges]; - for (int n = 0; n < numEdges; n++) { - neighbors[n] = in.readInt(); - } - int numMessages = in.readInt(); - messages = new int[numMessages]; - for (int n = 0; n < numMessages; n++) { - messages[n] = in.readInt(); - } + BasicVertex { + /** Int represented vertex id */ + private int id; + /** Int represented vertex value */ + private int value; + /** Int array of neighbor vertex ids */ + private int[] neighbors; + /** Int array of messages */ + private int[] messages; + + @Override + public void initialize(IntWritable vertexId, IntWritable vertexValue, + Map edges, + Iterable messages) { + id = vertexId.get(); + value = vertexValue.get(); + this.neighbors = new int[edges.size()]; + int n = 0; + for (IntWritable neighbor : edges.keySet()) { + this.neighbors[n++] = neighbor.get(); + } + this.messages = new int[Iterables.size(messages)]; + n = 0; + for (IntWritable message : messages) { + this.messages[n++] = message.get(); + } + } + + @Override + public IntWritable getVertexId() { + return new IntWritable(id); + } + + @Override + public IntWritable getVertexValue() { + return new IntWritable(value); + } + + @Override + public void setVertexValue(IntWritable vertexValue) { + value = vertexValue.get(); + } + + @Override + public Iterator iterator() { + return new UnmodifiableIntArrayIterator(neighbors); + } + + @Override + public NullWritable getEdgeValue(IntWritable targetVertexId) { + return NullWritable.get(); + } + + @Override + public boolean hasEdge(IntWritable targetVertexId) { + for (int neighbor : neighbors) { + if (neighbor == targetVertexId.get()) { + return true; + } + } + return false; + } + + @Override + public int getNumOutEdges() { + return neighbors.length; + } + + @Override + public void sendMsgToAllEdges(final IntWritable message) { + for (int neighbor : neighbors) { + sendMsg(new IntWritable(neighbor), message); + } + } + + @Override + public Iterable getMessages() { + return new Iterable() { + @Override + public Iterator iterator() { + return new UnmodifiableIntArrayIterator(messages); + } + }; + } + + @Override + public void putMessages(Iterable newMessages) { + messages = new int[Iterables.size(newMessages)]; + int n = 0; + for (IntWritable message : newMessages) { + messages[n++] = message.get(); + } + } + + @Override + void releaseResources() { + messages = new int[0]; + } + + @Override + public void write(final DataOutput out) throws IOException { + out.writeInt(id); + out.writeInt(value); + out.writeInt(neighbors.length); + for (int n = 0; n < neighbors.length; n++) { + out.writeInt(neighbors[n]); + } + out.writeInt(messages.length); + for (int n = 0; n < messages.length; n++) { + out.writeInt(messages[n]); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readInt(); + value = in.readInt(); + int numEdges = in.readInt(); + neighbors = new int[numEdges]; + for (int n = 0; n < numEdges; n++) { + neighbors[n] = in.readInt(); + } + int numMessages = in.readInt(); + messages = new int[numMessages]; + for (int n = 0; n < numMessages; n++) { + messages[n] = in.readInt(); } + } }