Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 609DDD935 for ; Thu, 20 Dec 2012 04:25:45 +0000 (UTC) Received: (qmail 55475 invoked by uid 500); 20 Dec 2012 04:25:44 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 55382 invoked by uid 500); 20 Dec 2012 04:25:44 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 54020 invoked by uid 99); 20 Dec 2012 04:25:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2012 04:25:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 114C53247B6; Thu, 20 Dec 2012 04:25:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nitay@apache.org To: commits@giraph.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [9/51] [partial] GIRAPH-457: update module names (nitay) Message-Id: <20121220042531.114C53247B6@tyr.zones.apache.org> Date: Thu, 20 Dec 2012 04:25:30 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java new file mode 100644 index 0000000..d791e58 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.conf; + +import org.apache.giraph.graph.AggregatorWriter; +import org.apache.giraph.graph.Combiner; +import org.apache.giraph.graph.DefaultMasterCompute; +import org.apache.giraph.graph.DefaultVertexResolver; +import org.apache.giraph.graph.DefaultWorkerContext; +import org.apache.giraph.graph.EdgeInputFormat; +import org.apache.giraph.graph.MasterCompute; +import org.apache.giraph.graph.TextAggregatorWriter; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexResolver; +import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.graph.partition.GraphPartitionerFactory; +import org.apache.giraph.graph.partition.HashPartitionerFactory; +import org.apache.giraph.graph.partition.Partition; +import org.apache.giraph.graph.partition.SimplePartition; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.util.List; + +/** + * Holder for classes used by Giraph. + * + * @param Vertex ID class + * @param Vertex Value class + * @param Edge class + * @param Message class + */ +public class GiraphClasses + implements GiraphConstants { + /** Vertex class - cached for fast access */ + protected Class> vertexClass; + /** Vertex id class - cached for fast access */ + protected Class vertexIdClass; + /** Vertex value class - cached for fast access */ + protected Class vertexValueClass; + /** Edge value class - cached for fast access */ + protected Class edgeValueClass; + /** Message value class - cached for fast access */ + protected Class messageValueClass; + + /** Graph partitioner factory class - cached for fast access */ + protected Class> + graphPartitionerFactoryClass; + + /** Vertex input format class - cached for fast access */ + protected Class> + vertexInputFormatClass; + /** Vertex output format class - cached for fast access */ + protected Class> + vertexOutputFormatClass; + /** Edge input format class - cached for fast access */ + protected Class> + edgeInputFormatClass; + + /** Aggregator writer class - cached for fast access */ + protected Class aggregatorWriterClass; + /** Combiner class - cached for fast access */ + protected Class> combinerClass; + + /** Vertex resolver class - cached for fast access */ + protected Class> vertexResolverClass; + /** Worker context class - cached for fast access */ + protected Class workerContextClass; + /** Master compute class - cached for fast access */ + protected Class masterComputeClass; + + /** Partition class - cached for fast accesss */ + protected Class> partitionClass; + + /** + * Empty constructor. Initialize with classes all null. + */ + public GiraphClasses() { } + + /** + * Contructor that reads classes from a Configuration object. + * + * @param conf Configuration object to read from. + */ + public GiraphClasses(Configuration conf) { + readFromConf(conf); + } + + /** + * Read classes from Configuration. + * + * @param conf Configuration to read from. + */ + public void readFromConf(Configuration conf) { + // set pre-validated generic parameter types into Configuration + vertexClass = (Class>) + conf.getClass(VERTEX_CLASS, null, Vertex.class); + List> classList = + org.apache.giraph.utils.ReflectionUtils.getTypeArguments( + Vertex.class, vertexClass); + vertexIdClass = (Class) classList.get(0); + vertexValueClass = (Class) classList.get(1); + edgeValueClass = (Class) classList.get(2); + messageValueClass = (Class) classList.get(3); + + graphPartitionerFactoryClass = + (Class>) + conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS, + HashPartitionerFactory.class, + GraphPartitionerFactory.class); + + vertexInputFormatClass = (Class>) + conf.getClass(VERTEX_INPUT_FORMAT_CLASS, + null, VertexInputFormat.class); + vertexOutputFormatClass = (Class>) + conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS, + null, VertexOutputFormat.class); + edgeInputFormatClass = (Class>) + conf.getClass(EDGE_INPUT_FORMAT_CLASS, + null, EdgeInputFormat.class); + + aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS, + TextAggregatorWriter.class, AggregatorWriter.class); + combinerClass = (Class>) + conf.getClass(VERTEX_COMBINER_CLASS, null, Combiner.class); + vertexResolverClass = (Class>) + conf.getClass(VERTEX_RESOLVER_CLASS, + DefaultVertexResolver.class, VertexResolver.class); + workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS, + DefaultWorkerContext.class, WorkerContext.class); + masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS, + DefaultMasterCompute.class, MasterCompute.class); + + partitionClass = (Class>) + conf.getClass(PARTITION_CLASS, SimplePartition.class); + } + + /** + * Get Vertex class + * + * @return Vertex class. + */ + public Class> getVertexClass() { + return vertexClass; + } + + /** + * Get Vertex ID class + * + * @return Vertex ID class + */ + public Class getVertexIdClass() { + return vertexIdClass; + } + + /** + * Get Vertex Value class + * + * @return Vertex Value class + */ + public Class getVertexValueClass() { + return vertexValueClass; + } + + /** + * Get Edge Value class + * + * @return Edge Value class + */ + public Class getEdgeValueClass() { + return edgeValueClass; + } + + /** + * Get Message Value class + * + * @return Message Value class + */ + public Class getMessageValueClass() { + return messageValueClass; + } + + /** + * Check if we GraphPartitionerFactory is set + * + * @return true if GraphPartitionerFactory is set + */ + public boolean hasGraphPartitionerFactoryClass() { + return graphPartitionerFactoryClass != null; + } + + /** + * Get the GraphPartitionerFactory + * + * @return GraphPartitionerFactory + */ + public Class> + getGraphPartitionerFactoryClass() { + return graphPartitionerFactoryClass; + } + + /** + * Check if VertexInputFormat class is set + * + * @return true if VertexInputFormat class is set + */ + public boolean hasVertexInputFormat() { + return vertexInputFormatClass != null; + } + + /** + * Get VertexInputFormat held + * + * @return VertexInputFormat + */ + public Class> + getVertexInputFormatClass() { + return vertexInputFormatClass; + } + + /** + * Check if VertexOutputFormat is set + * + * @return true if VertexOutputFormat is set + */ + public boolean hasVertexOutputFormat() { + return vertexOutputFormatClass != null; + } + + /** + * Get VertexOutputFormat set + * + * @return VertexOutputFormat + */ + public Class> + getVertexOutputFormatClass() { + return vertexOutputFormatClass; + } + + /** + * Check if EdgeInputFormat is set + * + * @return true if EdgeInputFormat is set + */ + public boolean hasEdgeInputFormat() { + return edgeInputFormatClass != null; + } + + /** + * Get EdgeInputFormat used + * + * @return EdgeInputFormat + */ + public Class> getEdgeInputFormatClass() { + return edgeInputFormatClass; + } + + /** + * Check if AggregatorWriter is set + * + * @return true if AggregatorWriter is set + */ + public boolean hasAggregatorWriterClass() { + return aggregatorWriterClass != null; + } + + /** + * Get AggregatorWriter used + * + * @return AggregatorWriter + */ + public Class getAggregatorWriterClass() { + return aggregatorWriterClass; + } + + /** + * Check if Combiner is set + * + * @return true if Combiner is set + */ + public boolean hasCombinerClass() { + return combinerClass != null; + } + + /** + * Get Combiner used + * + * @return Combiner + */ + public Class> getCombinerClass() { + return combinerClass; + } + + /** + * Check if VertexResolver is set + * + * @return true if VertexResolver is set + */ + public boolean hasVertexResolverClass() { + return vertexResolverClass != null; + } + + /** + * Get VertexResolver used + * + * @return VertexResolver + */ + public Class> getVertexResolverClass() { + return vertexResolverClass; + } + + /** + * Check if WorkerContext is set + * + * @return true if WorkerContext is set + */ + public boolean hasWorkerContextClass() { + return workerContextClass != null; + } + + /** + * Get WorkerContext used + * + * @return WorkerContext + */ + public Class getWorkerContextClass() { + return workerContextClass; + } + + /** + * Check if MasterCompute is set + * + * @return true MasterCompute is set + */ + public boolean hasMasterComputeClass() { + return masterComputeClass != null; + } + + /** + * Get MasterCompute used + * + * @return MasterCompute + */ + public Class getMasterComputeClass() { + return masterComputeClass; + } + + /** + * Check if Partition is set + * + * @return true if Partition is set + */ + public boolean hasPartitionClass() { + return partitionClass != null; + } + + /** + * Get Partition + * + * @return Partition + */ + public Class> getPartitionClass() { + return partitionClass; + } + + /** + * Set Vertex class held + * + * @param vertexClass Vertex class to set + * @return this + */ + public GiraphClasses setVertexClass( + Class> vertexClass) { + this.vertexClass = vertexClass; + return this; + } + + /** + * Set Vertex ID class held + * + * @param vertexIdClass Vertex ID to set + * @return this + */ + public GiraphClasses setVertexIdClass(Class vertexIdClass) { + this.vertexIdClass = vertexIdClass; + return this; + } + + /** + * Set Vertex Value class held + * + * @param vertexValueClass Vertex Value class to set + * @return this + */ + public GiraphClasses setVertexValueClass(Class vertexValueClass) { + this.vertexValueClass = vertexValueClass; + return this; + } + + /** + * Set Edge Value class held + * + * @param edgeValueClass Edge Value class to set + * @return this + */ + public GiraphClasses setEdgeValueClass(Class edgeValueClass) { + this.edgeValueClass = edgeValueClass; + return this; + } + + /** + * Set Message Value class held + * + * @param messageValueClass Message Value class to set + * @return this + */ + public GiraphClasses setMessageValueClass(Class messageValueClass) { + this.messageValueClass = messageValueClass; + return this; + } + + /** + * Set GraphPartitionerFactory class held + * + * @param klass GraphPartitionerFactory to set + * @return this + */ + public GiraphClasses setGraphPartitionerFactoryClass( + Class> klass) { + this.graphPartitionerFactoryClass = klass; + return this; + } + + /** + * Set VertexInputFormat held + * + * @param vertexInputFormatClass VertexInputFormat to set + * @return this + */ + public GiraphClasses setVertexInputFormatClass( + Class> vertexInputFormatClass) { + this.vertexInputFormatClass = vertexInputFormatClass; + return this; + } + + /** + * Set VertexOutputFormat held + * + * @param vertexOutputFormatClass VertexOutputFormat to set + * @return this + */ + public GiraphClasses setVertexOutputFormatClass( + Class> vertexOutputFormatClass) { + this.vertexOutputFormatClass = vertexOutputFormatClass; + return this; + } + + /** + * Set EdgeInputFormat class held + * + * @param edgeInputFormatClass EdgeInputFormat to set + * @return this + */ + public GiraphClasses setEdgeInputFormatClass( + Class> edgeInputFormatClass) { + this.edgeInputFormatClass = edgeInputFormatClass; + return this; + } + + /** + * Set AggregatorWriter class used + * + * @param aggregatorWriterClass AggregatorWriter to set + * @return this + */ + public GiraphClasses setAggregatorWriterClass( + Class aggregatorWriterClass) { + this.aggregatorWriterClass = aggregatorWriterClass; + return this; + } + + /** + * Set Combiner class used + * + * @param combinerClass Combiner class to set + * @return this + */ + public GiraphClasses setCombinerClass( + Class> combinerClass) { + this.combinerClass = combinerClass; + return this; + } + + /** + * Set VertexResolver used + * + * @param vertexResolverClass VertexResolver to set + * @return this + */ + public GiraphClasses setVertexResolverClass( + Class> vertexResolverClass) { + this.vertexResolverClass = vertexResolverClass; + return this; + } + + /** + * Set WorkerContext used + * + * @param workerContextClass WorkerContext class to set + * @return this + */ + public GiraphClasses setWorkerContextClass( + Class workerContextClass) { + this.workerContextClass = workerContextClass; + return this; + } + + /** + * Set MasterCompute class used + * + * @param masterComputeClass MasterCompute class to set + * @return this + */ + public GiraphClasses setMasterComputeClass( + Class masterComputeClass) { + this.masterComputeClass = masterComputeClass; + return this; + } + + /** + * Set Partition class to use + * + * @param partitionClass Partition class to set + * @return this + */ + public GiraphClasses setPartitionClass( + Class> partitionClass) { + this.partitionClass = partitionClass; + return this; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java new file mode 100644 index 0000000..3e14aad --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.conf; + +import org.apache.giraph.graph.AggregatorWriter; +import org.apache.giraph.graph.Combiner; +import org.apache.giraph.graph.EdgeInputFormat; +import org.apache.giraph.graph.MasterCompute; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexResolver; +import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.graph.partition.GraphPartitionerFactory; +import org.apache.giraph.graph.partition.Partition; +import org.apache.giraph.master.MasterObserver; +import org.apache.hadoop.conf.Configuration; + +/** + * Adds user methods specific to Giraph. This will be put into an + * ImmutableClassesGiraphConfiguration that provides the configuration plus + * the immutable classes. + */ +public class GiraphConfiguration extends Configuration + implements GiraphConstants { + /** + * Constructor that creates the configuration + */ + public GiraphConfiguration() { } + + /** + * Constructor. + * + * @param conf Configuration + */ + public GiraphConfiguration(Configuration conf) { + super(conf); + } + + /** + * Set the vertex class (required) + * + * @param vertexClass Runs vertex computation + */ + public final void setVertexClass( + Class vertexClass) { + setClass(VERTEX_CLASS, vertexClass, Vertex.class); + } + + /** + * Set the vertex input format class (required) + * + * @param vertexInputFormatClass Determines how graph is input + */ + public final void setVertexInputFormatClass( + Class vertexInputFormatClass) { + setClass(VERTEX_INPUT_FORMAT_CLASS, + vertexInputFormatClass, + VertexInputFormat.class); + } + + /** + * Set the edge input format class (required) + * + * @param edgeInputFormatClass Determines how graph is input + */ + public final void setEdgeInputFormatClass( + Class edgeInputFormatClass) { + setClass(EDGE_INPUT_FORMAT_CLASS, + edgeInputFormatClass, + EdgeInputFormat.class); + } + + /** + * Set the master class (optional) + * + * @param masterComputeClass Runs master computation + */ + public final void setMasterComputeClass( + Class masterComputeClass) { + setClass(MASTER_COMPUTE_CLASS, masterComputeClass, + MasterCompute.class); + } + + /** + * Add a MasterObserver class (optional) + * + * @param masterObserverClass MasterObserver class to add. + */ + public final void addMasterObserverClass( + Class masterObserverClass) { + addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass, + MasterObserver.class); + } + + /** + * Add a class to a property that is a list of classes. If the property does + * not exist it will be created. + * + * @param name String name of property. + * @param klass interface of the class being set. + * @param xface Class to add to the list. + */ + public final void addToClasses(String name, Class klass, Class xface) { + if (!xface.isAssignableFrom(klass)) { + throw new RuntimeException(klass + " does not implement " + + xface.getName()); + } + String value; + String klasses = get(name); + if (klasses == null) { + value = klass.getName(); + } else { + value = klasses + "," + klass.getName(); + } + set(name, value); + } + + /** + * Set mapping from a key name to a list of classes. + * + * @param name String key name to use. + * @param xface interface of the classes being set. + * @param klasses Classes to set. + */ + public final void setClasses(String name, Class xface, + Class ... klasses) { + String[] klassNames = new String[klasses.length]; + for (int i = 0; i < klasses.length; ++i) { + Class klass = klasses[i]; + if (!xface.isAssignableFrom(klass)) { + throw new RuntimeException(klass + " does not implement " + + xface.getName()); + } + klassNames[i] = klasses[i].getName(); + } + setStrings(name, klassNames); + } + + /** + * Get classes from a property that all implement a given interface. + * + * @param name String name of property to fetch. + * @param xface interface classes must implement. + * @param defaultValue If not found, return this + * @param Generic type of interface class + * @return array of Classes implementing interface specified. + */ + public final Class[] getClassesOfType(String name, + Class xface, Class ... defaultValue) { + Class[] klasses = getClasses(name, defaultValue); + for (Class klass : klasses) { + if (!xface.isAssignableFrom(klass)) { + throw new RuntimeException(klass + " is not assignable from " + + xface.getName()); + } + } + return (Class[]) klasses; + } + + /** + * Set the vertex output format class (optional) + * + * @param vertexOutputFormatClass Determines how graph is output + */ + public final void setVertexOutputFormatClass( + Class vertexOutputFormatClass) { + setClass(VERTEX_OUTPUT_FORMAT_CLASS, + vertexOutputFormatClass, + VertexOutputFormat.class); + } + + /** + * Set the vertex combiner class (optional) + * + * @param vertexCombinerClass Determines how vertex messages are combined + */ + public final void setVertexCombinerClass( + Class vertexCombinerClass) { + setClass(VERTEX_COMBINER_CLASS, + vertexCombinerClass, + Combiner.class); + } + + /** + * Set the graph partitioner class (optional) + * + * @param graphPartitionerFactoryClass Determines how the graph is partitioned + */ + public final void setGraphPartitionerFactoryClass( + Class graphPartitionerFactoryClass) { + setClass(GRAPH_PARTITIONER_FACTORY_CLASS, + graphPartitionerFactoryClass, + GraphPartitionerFactory.class); + } + + /** + * Set the vertex resolver class (optional) + * + * @param vertexResolverClass Determines how vertex mutations are resolved + */ + public final void setVertexResolverClass( + Class vertexResolverClass) { + setClass(VERTEX_RESOLVER_CLASS, vertexResolverClass, VertexResolver.class); + } + + /** + * Whether to create a vertex that doesn't exist when it receives messages. + * This only affects DefaultVertexResolver. + * + * @return true if we should create non existent vertices that get messages. + */ + public final boolean getResolverCreateVertexOnMessages() { + return getBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, true); + } + + /** + * Set whether to create non existent vertices when they receive messages. + * + * @param v true if we should create vertices when they get messages. + */ + public final void setResolverCreateVertexOnMessages(boolean v) { + setBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, v); + } + + /** + * Set the worker context class (optional) + * + * @param workerContextClass Determines what code is executed on a each + * worker before and after each superstep and computation + */ + public final void setWorkerContextClass( + Class workerContextClass) { + setClass(WORKER_CONTEXT_CLASS, + workerContextClass, + WorkerContext.class); + } + + /** + * Set the aggregator writer class (optional) + * + * @param aggregatorWriterClass Determines how the aggregators are + * written to file at the end of the job + */ + public final void setAggregatorWriterClass( + Class aggregatorWriterClass) { + setClass(AGGREGATOR_WRITER_CLASS, + aggregatorWriterClass, + AggregatorWriter.class); + } + + /** + * Set the partition class (optional) + * + * @param partitionClass Determines the why partitions are stored + */ + public final void setPartitionClass( + Class partitionClass) { + setClass(PARTITION_CLASS, + partitionClass, + Partition.class); + } + + /** + * Set worker configuration for determining what is required for + * a superstep. + * + * @param minWorkers Minimum workers to do a superstep + * @param maxWorkers Maximum workers to do a superstep + * (max map tasks in job) + * @param minPercentResponded 0 - 100 % of the workers required to + * have responded before continuing the superstep + */ + public final void setWorkerConfiguration(int minWorkers, + int maxWorkers, + float minPercentResponded) { + setInt(MIN_WORKERS, minWorkers); + setInt(MAX_WORKERS, maxWorkers); + setFloat(MIN_PERCENT_RESPONDED, minPercentResponded); + } + + public final int getMinWorkers() { + return getInt(MIN_WORKERS, -1); + } + + public final int getMaxWorkers() { + return getInt(MAX_WORKERS, -1); + } + + public final float getMinPercentResponded() { + return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT); + } + + /** + * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper + * will be dynamically started by Giraph for this job. + * + * @param serverList Comma separated list of servers and ports + * (i.e. zk1:2221,zk2:2221) + */ + public final void setZooKeeperConfiguration(String serverList) { + set(ZOOKEEPER_LIST, serverList); + } + + public final boolean getSplitMasterWorker() { + return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT); + } + + /** + * Get array of MasterObserver classes set in the configuration. + * + * @return array of MasterObserver classes. + */ + public Class[] getMasterObserverClasses() { + return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class); + } + + /** + * Whether to track, print, and aggregate metrics. + * + * @return true if metrics are enabled, false otherwise (default) + */ + public boolean metricsEnabled() { + return getBoolean(METRICS_ENABLE, false); + } + + /** + * Get the task partition + * + * @return The task partition or -1 if not set + */ + public int getTaskPartition() { + return getInt("mapred.task.partition", -1); + } + + /** + * Get the ZooKeeper list. + * + * @return ZooKeeper list of strings, comma separated or null if none set. + */ + public String getZookeeperList() { + return get(ZOOKEEPER_LIST); + } + + public String getLocalLevel() { + return get(LOG_LEVEL, LOG_LEVEL_DEFAULT); + } + + /** + * Use the log thread layout option? + * + * @return True if use the log thread layout option, false otherwise + */ + public boolean useLogThreadLayout() { + return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT); + } + + public boolean getLocalTestMode() { + return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT); + } + + public int getZooKeeperServerCount() { + return getInt(ZOOKEEPER_SERVER_COUNT, + ZOOKEEPER_SERVER_COUNT_DEFAULT); + } + + /** + * Set the ZooKeeper jar classpath + * + * @param classPath Classpath for the ZooKeeper jar + */ + public void setZooKeeperJar(String classPath) { + set(ZOOKEEPER_JAR, classPath); + } + + public int getZooKeeperSessionTimeout() { + return getInt(ZOOKEEPER_SESSION_TIMEOUT, + ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); + } + + public int getZookeeperOpsMaxAttempts() { + return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS, + ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT); + } + + public int getZookeeperOpsRetryWaitMsecs() { + return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS, + ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT); + } + + public boolean getNettyServerUseExecutionHandler() { + return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER, + NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT); + } + + public int getNettyServerThreads() { + return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT); + } + + public int getNettyServerExecutionThreads() { + return getInt(NETTY_SERVER_EXECUTION_THREADS, + NETTY_SERVER_EXECUTION_THREADS_DEFAULT); + } + + /** + * Get the netty server execution concurrency. This depends on whether the + * netty server execution handler exists. + * + * @return Server concurrency + */ + public int getNettyServerExecutionConcurrency() { + if (getNettyServerUseExecutionHandler()) { + return getNettyServerExecutionThreads(); + } else { + return getNettyServerThreads(); + } + } + + public int getZookeeperConnectionAttempts() { + return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS, + ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT); + } + + public int getZooKeeperMinSessionTimeout() { + return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT, + DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT); + } + + public int getZooKeeperMaxSessionTimeout() { + return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT, + DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT); + } + + public String getZooKeeperForceSync() { + return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC); + } + + public String getZooKeeperSkipAcl() { + return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL); + } + + /** + * Get the number of map tasks in this job + * + * @return Number of map tasks in this job + */ + public int getMapTasks() { + int mapTasks = getInt("mapred.map.tasks", -1); + if (mapTasks == -1) { + throw new IllegalStateException("getMapTasks: Failed to get the map " + + "tasks!"); + } + return mapTasks; + } + + /** + * Use authentication? (if supported) + * + * @return True if should authenticate, false otherwise + */ + public boolean authenticate() { + return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE); + } + + /** + * Set the number of compute threads + * + * @param numComputeThreads Number of compute threads to use + */ + public void setNumComputeThreads(int numComputeThreads) { + setInt(NUM_COMPUTE_THREADS, numComputeThreads); + } + + public int getNumComputeThreads() { + return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT); + } + + /** + * Set the number of input split threads + * + * @param numInputSplitsThreads Number of input split threads to use + */ + public void setNumInputSplitsThreads(int numInputSplitsThreads) { + setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads); + } + + public int getNumInputSplitsThreads() { + return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT); + } + + public long getInputSplitMaxVertices() { + return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT); + } + + public long getInputSplitMaxEdges() { + return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT); + } + + /** + * Set whether to use unsafe serialization + * + * @param useUnsafeSerialization If true, use unsafe serialization + */ + public void useUnsafeSerialization(boolean useUnsafeSerialization) { + setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization); + } + + /** + * Use message size encoding? This feature may help with complex message + * objects. + * + * @return Whether to use message size encoding + */ + public boolean useMessageSizeEncoding() { + return getBoolean( + USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT); + } + + /** + * Set the checkpoint frequeuncy of how many supersteps to wait before + * checkpointing + * + * @param checkpointFrequency How often to checkpoint (0 means never) + */ + public void setCheckpointFrequency(int checkpointFrequency) { + setInt(CHECKPOINT_FREQUENCY, checkpointFrequency); + } + + /** + * Get the checkpoint frequeuncy of how many supersteps to wait + * before checkpointing + * + * @return Checkpoint frequency (0 means never) + */ + public int getCheckpointFrequency() { + return getInt(CHECKPOINT_FREQUENCY, CHECKPOINT_FREQUENCY_DEFAULT); + } + + /** + * Set the max task attempts + * + * @param maxTaskAttempts Max task attempts to use + */ + public void setMaxTaskAttempts(int maxTaskAttempts) { + setInt(MAX_TASK_ATTEMPTS, maxTaskAttempts); + } + + /** + * Get the max task attempts + * + * @return Max task attempts or -1, if not set + */ + public int getMaxTaskAttempts() { + return getInt(MAX_TASK_ATTEMPTS, -1); + } + + /** + * Get the number of milliseconds to wait for an event before continuing on + * + * @return Number of milliseconds to wait for an event before continuing on + */ + public int getEventWaitMsecs() { + return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT); + } + + /** + * Set the number of milliseconds to wait for an event before continuing on + * + * @param eventWaitMsecs Number of milliseconds to wait for an event before + * continuing on + */ + public void setEventWaitMsecs(int eventWaitMsecs) { + setInt(EVENT_WAIT_MSECS, eventWaitMsecs); + } + + /** + * Get the maximum milliseconds to wait before giving up trying to get the + * minimum number of workers before a superstep. + * + * @return Maximum milliseconds to wait before giving up trying to get the + * minimum number of workers before a superstep + */ + public int getMaxMasterSuperstepWaitMsecs() { + return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, + MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT); + } + + /** + * Set the maximum milliseconds to wait before giving up trying to get the + * minimum number of workers before a superstep. + * + * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before + * giving up trying to get the minimum + * number of workers before a superstep + */ + public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) { + setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java new file mode 100644 index 0000000..11d4a41 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.conf; + +/** + * Constants used all over Giraph for configuration. + */ +// CHECKSTYLE: stop InterfaceIsTypeCheck +public interface GiraphConstants { + /** Vertex class - required */ + String VERTEX_CLASS = "giraph.vertexClass"; + + /** Class for Master - optional */ + String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass"; + /** Classes for Observer Master - optional */ + String MASTER_OBSERVER_CLASSES = "giraph.master.observers"; + /** Vertex combiner class - optional */ + String VERTEX_COMBINER_CLASS = "giraph.combinerClass"; + /** Vertex resolver class - optional */ + String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass"; + /** + * Option of whether to create vertexes that were not existent before but + * received messages + */ + String RESOLVER_CREATE_VERTEX_ON_MSGS = + "giraph.vertex.resolver.create.on.msgs"; + /** Graph partitioner factory class - optional */ + String GRAPH_PARTITIONER_FACTORY_CLASS = + "giraph.graphPartitionerFactoryClass"; + + // At least one of the input format classes is required. + /** VertexInputFormat class */ + String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass"; + /** EdgeInputFormat class */ + String EDGE_INPUT_FORMAT_CLASS = "giraph.edgeInputFormatClass"; + + /** VertexOutputFormat class */ + String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass"; + + /** Vertex index class */ + String VERTEX_ID_CLASS = "giraph.vertexIdClass"; + /** Vertex value class */ + String VERTEX_VALUE_CLASS = "giraph.vertexValueClass"; + /** Edge value class */ + String EDGE_VALUE_CLASS = "giraph.edgeValueClass"; + /** Message value class */ + String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; + /** Worker context class */ + String WORKER_CONTEXT_CLASS = "giraph.workerContextClass"; + /** AggregatorWriter class - optional */ + String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass"; + + /** Partition class - optional */ + String PARTITION_CLASS = "giraph.partitionClass"; + + /** + * Minimum number of simultaneous workers before this job can run (int) + */ + String MIN_WORKERS = "giraph.minWorkers"; + /** + * Maximum number of simultaneous worker tasks started by this job (int). + */ + String MAX_WORKERS = "giraph.maxWorkers"; + + /** + * Separate the workers and the master tasks. This is required + * to support dynamic recovery. (boolean) + */ + String SPLIT_MASTER_WORKER = "giraph.SplitMasterWorker"; + /** + * Default on whether to separate the workers and the master tasks. + * Needs to be "true" to support dynamic recovery. + */ + boolean SPLIT_MASTER_WORKER_DEFAULT = true; + + /** Indicates whether this job is run in an internal unit test */ + String LOCAL_TEST_MODE = "giraph.localTestMode"; + + /** not in local test mode per default */ + boolean LOCAL_TEST_MODE_DEFAULT = false; + + /** Override the Hadoop log level and set the desired log level. */ + String LOG_LEVEL = "giraph.logLevel"; + /** Default log level is INFO (same as Hadoop) */ + String LOG_LEVEL_DEFAULT = "info"; + + /** Use thread level debugging? */ + String LOG_THREAD_LAYOUT = "giraph.logThreadLayout"; + /** Default to not use thread-level debugging */ + boolean LOG_THREAD_LAYOUT_DEFAULT = false; + + /** + * Minimum percent of the maximum number of workers that have responded + * in order to continue progressing. (float) + */ + String MIN_PERCENT_RESPONDED = "giraph.minPercentResponded"; + /** Default 100% response rate for workers */ + float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f; + + /** Enable the Metrics system **/ + String METRICS_ENABLE = "giraph.metrics.enable"; + + /** + * ZooKeeper comma-separated list (if not set, + * will start up ZooKeeper locally) + */ + String ZOOKEEPER_LIST = "giraph.zkList"; + + /** ZooKeeper session millisecond timeout */ + String ZOOKEEPER_SESSION_TIMEOUT = "giraph.zkSessionMsecTimeout"; + /** Default Zookeeper session millisecond timeout */ + int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000; + + /** Polling interval to check for the ZooKeeper server data */ + String ZOOKEEPER_SERVERLIST_POLL_MSECS = "giraph.zkServerlistPollMsecs"; + /** Default polling interval to check for the ZooKeeper server data */ + int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = 3 * 1000; + + /** Number of nodes (not tasks) to run Zookeeper on */ + String ZOOKEEPER_SERVER_COUNT = "giraph.zkServerCount"; + /** Default number of nodes to run Zookeeper on */ + int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1; + + /** ZooKeeper port to use */ + String ZOOKEEPER_SERVER_PORT = "giraph.zkServerPort"; + /** Default ZooKeeper port to use */ + int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181; + + /** Location of the ZooKeeper jar - Used internally, not meant for users */ + String ZOOKEEPER_JAR = "giraph.zkJar"; + + /** Local ZooKeeper directory to use */ + String ZOOKEEPER_DIR = "giraph.zkDir"; + + /** Max attempts for handling ZooKeeper connection loss */ + String ZOOKEEPER_OPS_MAX_ATTEMPTS = "giraph.zkOpsMaxAttempts"; + /** Default of 3 attempts for handling ZooKeeper connection loss */ + int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3; + + /** + * Msecs to wait before retrying a failed ZooKeeper op due to connection + * loss. + */ + String ZOOKEEPER_OPS_RETRY_WAIT_MSECS = "giraph.zkOpsRetryWaitMsecs"; + /** + * Default to wait 5 seconds before retrying a failed ZooKeeper op due to + * connection loss. + */ + int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000; + + /** TCP backlog (defaults to number of workers) */ + String TCP_BACKLOG = "giraph.tcpBacklog"; + /** + * Default TCP backlog default if the number of workers is not specified + * (i.e unittests) + */ + int TCP_BACKLOG_DEFAULT = 1; + + /** How big to make the default buffer? */ + String NETTY_REQUEST_ENCODER_BUFFER_SIZE = + "giraph.nettyRequestEncoderBufferSize"; + /** Start with 32K */ + int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024; + + /** Netty client threads */ + String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads"; + /** Default is 4 */ + int NETTY_CLIENT_THREADS_DEFAULT = 4; + + /** Netty server threads */ + String NETTY_SERVER_THREADS = "giraph.nettyServerThreads"; + /** Default is 16 */ + int NETTY_SERVER_THREADS_DEFAULT = 16; + + /** Use the execution handler in netty on the client? */ + String NETTY_CLIENT_USE_EXECUTION_HANDLER = + "giraph.nettyClientUseExecutionHandler"; + /** Use the execution handler in netty on the client - default true */ + boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT = true; + + /** Netty client execution threads (execution handler) */ + String NETTY_CLIENT_EXECUTION_THREADS = + "giraph.nettyClientExecutionThreads"; + /** Default Netty client execution threads (execution handler) of 8 */ + int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8; + + /** Where to place the netty client execution handle? */ + String NETTY_CLIENT_EXECUTION_AFTER_HANDLER = + "giraph.nettyClientExecutionAfterHandler"; + /** + * Default is to use the netty client execution handle after the request + * encoder. + */ + String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT = "requestEncoder"; + + /** Use the execution handler in netty on the server? */ + String NETTY_SERVER_USE_EXECUTION_HANDLER = + "giraph.nettyServerUseExecutionHandler"; + /** Use the execution handler in netty on the server - default true */ + boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT = true; + + /** Netty server execution threads (execution handler) */ + String NETTY_SERVER_EXECUTION_THREADS = "giraph.nettyServerExecutionThreads"; + /** Default Netty server execution threads (execution handler) of 8 */ + int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8; + + /** Where to place the netty server execution handle? */ + String NETTY_SERVER_EXECUTION_AFTER_HANDLER = + "giraph.nettyServerExecutionAfterHandler"; + /** + * Default is to use the netty server execution handle after the request + * frame decoder. + */ + String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT = "requestFrameDecoder"; + + /** Netty simulate a first request closed */ + String NETTY_SIMULATE_FIRST_REQUEST_CLOSED = + "giraph.nettySimulateFirstRequestClosed"; + /** Default of not simulating failure for first request */ + boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT = false; + + /** Netty simulate a first response failed */ + String NETTY_SIMULATE_FIRST_RESPONSE_FAILED = + "giraph.nettySimulateFirstResponseFailed"; + /** Default of not simulating failure for first reponse */ + boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT = false; + + /** Max resolve address attempts */ + String MAX_RESOLVE_ADDRESS_ATTEMPTS = "giraph.maxResolveAddressAttempts"; + /** Default max resolve address attempts */ + int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5; + + /** Msecs to wait between waiting for all requests to finish */ + String WAITING_REQUEST_MSECS = "giraph.waitingRequestMsecs"; + /** Default msecs to wait between waiting for all requests to finish */ + int WAITING_REQUEST_MSECS_DEFAULT = 15000; + + /** Millseconds to wait for an event before continuing */ + String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs"; + /** + * Default milliseconds to wait for an event before continuing (30 seconds) + */ + int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000; + + /** + * Maximum milliseconds to wait before giving up trying to get the minimum + * number of workers before a superstep (int). + */ + String MAX_MASTER_SUPERSTEP_WAIT_MSECS = "giraph.maxMasterSuperstepWaitMsecs"; + /** + * Default maximum milliseconds to wait before giving up trying to get + * the minimum number of workers before a superstep (10 minutes). + */ + int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT = 10 * 60 * 1000; + + /** Milliseconds for a request to complete (or else resend) */ + String MAX_REQUEST_MILLISECONDS = "giraph.maxRequestMilliseconds"; + /** Maximum number of milliseconds for a request to complete (10 minutes) */ + int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000; + + /** Netty max connection failures */ + String NETTY_MAX_CONNECTION_FAILURES = "giraph.nettyMaxConnectionFailures"; + /** Default Netty max connection failures */ + int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000; + + /** Initial port to start using for the IPC communication */ + String IPC_INITIAL_PORT = "giraph.ipcInitialPort"; + /** Default port to start using for the IPC communication */ + int IPC_INITIAL_PORT_DEFAULT = 30000; + + /** Maximum bind attempts for different IPC ports */ + String MAX_IPC_PORT_BIND_ATTEMPTS = "giraph.maxIpcPortBindAttempts"; + /** Default maximum bind attempts for different IPC ports */ + int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20; + /** + * Fail first IPC port binding attempt, simulate binding failure + * on real grid testing + */ + String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT = + "giraph.failFirstIpcPortBindAttempt"; + /** Default fail first IPC port binding attempt flag */ + boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false; + + /** Client send buffer size */ + String CLIENT_SEND_BUFFER_SIZE = "giraph.clientSendBufferSize"; + /** Default client send buffer size of 0.5 MB */ + int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024; + + /** Client receive buffer size */ + String CLIENT_RECEIVE_BUFFER_SIZE = "giraph.clientReceiveBufferSize"; + /** Default client receive buffer size of 32 k */ + int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024; + + /** Server send buffer size */ + String SERVER_SEND_BUFFER_SIZE = "giraph.serverSendBufferSize"; + /** Default server send buffer size of 32 k */ + int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024; + + /** Server receive buffer size */ + String SERVER_RECEIVE_BUFFER_SIZE = "giraph.serverReceiveBufferSize"; + /** Default server receive buffer size of 0.5 MB */ + int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024; + + /** Maximum size of messages (in bytes) per peer before flush */ + String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize"; + /** Default maximum size of messages per peer before flush of 0.5MB */ + int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024; + + /** Maximum number of messages per peer before flush */ + String MSG_SIZE = "giraph.msgSize"; + /** Default maximum number of messages per peer before flush */ + int MSG_SIZE_DEFAULT = 2000; + + /** Maximum number of mutations per partition before flush */ + String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest"; + /** Default maximum number of mutations per partition before flush */ + int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100; + + /** + * Use message size encoding (typically better for complex objects, + * not meant for primitive wrapped messages) + */ + String USE_MESSAGE_SIZE_ENCODING = "giraph.useMessageSizeEncoding"; + /** + * By default, do not use message size encoding as it is experimental. + */ + boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false; + + /** Number of channels used per server */ + String CHANNELS_PER_SERVER = "giraph.channelsPerServer"; + /** Default number of channels used per server of 1 */ + int DEFAULT_CHANNELS_PER_SERVER = 1; + + /** Number of flush threads per peer */ + String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads"; + + /** Number of threads for vertex computation */ + String NUM_COMPUTE_THREADS = "giraph.numComputeThreads"; + /** Default number of threads for vertex computation */ + int NUM_COMPUTE_THREADS_DEFAULT = 1; + + /** Number of threads for input splits loading */ + String NUM_INPUT_SPLITS_THREADS = "giraph.numInputSplitsThreads"; + /** Default number of threads for input splits loading */ + int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1; + + /** Minimum stragglers of the superstep before printing them out */ + String PARTITION_LONG_TAIL_MIN_PRINT = "giraph.partitionLongTailMinPrint"; + /** Only print stragglers with one as a default */ + int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1; + + /** Use superstep counters? (boolean) */ + String USE_SUPERSTEP_COUNTERS = "giraph.useSuperstepCounters"; + /** Default is to use the superstep counters */ + boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true; + + /** + * Set the multiplicative factor of how many partitions to create from + * a single InputSplit based on the number of total InputSplits. For + * example, if there are 10 total InputSplits and this is set to 0.5, then + * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the + * minimum size is met). + */ + String TOTAL_INPUT_SPLIT_MULTIPLIER = "giraph.totalInputSplitMultiplier"; + + /** + * Input split sample percent - Used only for sampling and testing, rather + * than an actual job. The idea is that to test, you might only want a + * fraction of the actual input splits from your VertexInputFormat to + * load (values should be [0, 100]). + */ + String INPUT_SPLIT_SAMPLE_PERCENT = "giraph.inputSplitSamplePercent"; + /** Default is to use all the input splits */ + float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f; + + /** + * To limit outlier vertex input splits from producing too many vertices or + * to help with testing, the number of vertices loaded from an input split + * can be limited. By default, everything is loaded. + */ + String INPUT_SPLIT_MAX_VERTICES = "giraph.InputSplitMaxVertices"; + /** + * Default is that all the vertices are to be loaded from the input split + */ + long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1; + + /** + * To limit outlier vertex input splits from producing too many vertices or + * to help with testing, the number of edges loaded from an input split + * can be limited. By default, everything is loaded. + */ + String INPUT_SPLIT_MAX_EDGES = "giraph.InputSplitMaxEdges"; + /** + * Default is that all the edges are to be loaded from the input split + */ + long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1; + + /** Java opts passed to ZooKeeper startup */ + String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts"; + /** Default java opts passed to ZooKeeper startup */ + String ZOOKEEPER_JAVA_OPTS_DEFAULT = + "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " + + "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100"; + + /** + * How often to checkpoint (i.e. 0, means no checkpoint, + * 1 means every superstep, 2 is every two supersteps, etc.). + */ + String CHECKPOINT_FREQUENCY = "giraph.checkpointFrequency"; + + /** Default checkpointing frequency of none. */ + int CHECKPOINT_FREQUENCY_DEFAULT = 0; + + /** + * Delete checkpoints after a successful job run? + */ + String CLEANUP_CHECKPOINTS_AFTER_SUCCESS = + "giraph.cleanupCheckpointsAfterSuccess"; + /** Default is to clean up the checkponts after a successful job */ + boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = true; + + /** + * An application can be restarted manually by selecting a superstep. The + * corresponding checkpoint must exist for this to work. The user should + * set a long value. Default is start from scratch. + */ + String RESTART_SUPERSTEP = "giraph.restartSuperstep"; + + /** + * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root + * znode on the cluster beginning with "/" + */ + String BASE_ZNODE_KEY = "giraph.zkBaseZNode"; + + /** + * If ZOOKEEPER_LIST is not set, then use this directory to manage + * ZooKeeper + */ + String ZOOKEEPER_MANAGER_DIRECTORY = "giraph.zkManagerDirectory"; + /** + * Default ZooKeeper manager directory (where determining the servers in + * HDFS files will go). directory path will also have job number + * for uniqueness. + */ + String ZOOKEEPER_MANAGER_DIR_DEFAULT = "_bsp/_defaultZkManagerDir"; + + /** Number of ZooKeeper client connection attempts before giving up. */ + String ZOOKEEPER_CONNECTION_ATTEMPTS = "giraph.zkConnectionAttempts"; + /** Default of 10 ZooKeeper client connection attempts before giving up. */ + int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10; + + /** This directory has/stores the available checkpoint files in HDFS. */ + String CHECKPOINT_DIRECTORY = "giraph.checkpointDirectory"; + /** + * Default checkpoint directory (where checkpoing files go in HDFS). Final + * directory path will also have the job number for uniqueness + */ + String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/"; + + /** Directory in the local file system for out-of-core messages. */ + String MESSAGES_DIRECTORY = "giraph.messagesDirectory"; + /** + * Default messages directory. directory path will also have the + * job number for uniqueness + */ + String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/"; + + /** Whether or not to use out-of-core messages */ + String USE_OUT_OF_CORE_MESSAGES = "giraph.useOutOfCoreMessages"; + /** Default choice about using out-of-core messaging */ + boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false; + /** + * If using out-of-core messaging, it tells how much messages do we keep + * in memory. + */ + String MAX_MESSAGES_IN_MEMORY = "giraph.maxMessagesInMemory"; + /** Default maximum number of messages in memory. */ + int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000; + /** Size of buffer when reading and writing messages out-of-core. */ + String MESSAGES_BUFFER_SIZE = "giraph.messagesBufferSize"; + /** Default size of buffer when reading and writing messages out-of-core. */ + int MESSAGES_BUFFER_SIZE_DEFAULT = 8192; + + /** Directory in the local filesystem for out-of-core partitions. */ + String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory"; + /** Default directory for out-of-core partitions. */ + String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions"; + + /** Enable out-of-core graph. */ + String USE_OUT_OF_CORE_GRAPH = "giraph.useOutOfCoreGraph"; + /** Default is not to use out-of-core graph. */ + boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false; + + /** Maximum number of partitions to hold in memory for each worker. */ + String MAX_PARTITIONS_IN_MEMORY = "giraph.maxPartitionsInMemory"; + /** Default maximum number of in-memory partitions. */ + int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10; + + /** Keep the zookeeper output for debugging? Default is to remove it. */ + String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData"; + /** Default is to remove ZooKeeper data. */ + Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false; + + /** Default ZooKeeper tick time. */ + int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; + /** Default ZooKeeper init limit (in ticks). */ + int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; + /** Default ZooKeeper sync limit (in ticks). */ + int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; + /** Default ZooKeeper snap count. */ + int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000; + /** Default ZooKeeper maximum client connections. */ + int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; + /** ZooKeeper minimum session timeout */ + String ZOOKEEPER_MIN_SESSION_TIMEOUT = "giraph.zKMinSessionTimeout"; + /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */ + int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000; + /** ZooKeeper maximum session timeout */ + String ZOOKEEPER_MAX_SESSION_TIMEOUT = "giraph.zkMaxSessionTimeout"; + /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */ + int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000; + /** ZooKeeper force sync */ + String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync"; + /** Default ZooKeeper force sync is off (for performance) */ + String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no"; + /** ZooKeeper skip ACLs */ + String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl"; + /** Default ZooKeeper skip ACLs true (for performance) */ + String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes"; + + /** + * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate + * and authorize Netty BSP Clients to Servers. + */ + String AUTHENTICATE = "giraph.authenticate"; + /** Default is not to do authenticate and authorization with Netty. */ + boolean DEFAULT_AUTHENTICATE = false; + + /** Use unsafe serialization? */ + String USE_UNSAFE_SERIALIZATION = "giraph.useUnsafeSerialization"; + /** + * Use unsafe serialization default is true (use it if you can, + * its much faster)! + */ + boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true; + + /** + * Maximum number of attempts a master/worker will retry before killing + * the job. This directly maps to the number of map task attempts in + * Hadoop. + */ + String MAX_TASK_ATTEMPTS = "mapred.map.max.attempts"; +} +// CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java new file mode 100644 index 0000000..e4351a2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.conf; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Can be instantiated with ImmutableClassesGiraphConfiguration + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +public interface ImmutableClassesGiraphConfigurable< + I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> { + /** + * Set the configuration to be used by this object. + * + * @param configuration Set configuration + */ + void setConf(ImmutableClassesGiraphConfiguration + configuration); + + /** + * Return the configuration used by this object. + * + * @return Set configuration + */ + ImmutableClassesGiraphConfiguration getConf(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java new file mode 100644 index 0000000..00e4135 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.conf; + +import org.apache.giraph.graph.AggregatorWriter; +import org.apache.giraph.graph.Combiner; +import org.apache.giraph.graph.EdgeInputFormat; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.MasterCompute; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexOutputFormat; +import org.apache.giraph.graph.VertexResolver; +import org.apache.giraph.graph.WorkerContext; +import org.apache.giraph.graph.partition.GraphPartitionerFactory; +import org.apache.giraph.graph.partition.MasterGraphPartitioner; +import org.apache.giraph.graph.partition.Partition; +import org.apache.giraph.graph.partition.PartitionStats; +import org.apache.giraph.master.MasterObserver; +import org.apache.giraph.utils.ExtendedByteArrayDataInput; +import org.apache.giraph.utils.ExtendedByteArrayDataOutput; +import org.apache.giraph.utils.ExtendedDataInput; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +/** + * The classes set here are immutable, the remaining configuration is mutable. + * Classes are immutable and final to provide the best performance for + * instantiation. Everything is thread-safe. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +public class ImmutableClassesGiraphConfiguration extends + GiraphConfiguration { + /** Master graph partitioner - cached for fast access */ + protected final MasterGraphPartitioner masterGraphPartitioner; + + /** Holder for all the classes */ + private final GiraphClasses classes; + + /** + * Use unsafe serialization? Cached for fast access to instantiate the + * extended data input/output classes + */ + private final boolean useUnsafeSerialization; + + /** + * Constructor. Takes the configuration and then gets the classes out of + * them for Giraph + * + * @param conf Configuration + */ + public ImmutableClassesGiraphConfiguration(Configuration conf) { + super(conf); + + classes = new GiraphClasses(conf); + masterGraphPartitioner = (MasterGraphPartitioner) + createGraphPartitioner().createMasterGraphPartitioner(); + + useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION, + USE_UNSAFE_SERIALIZATION_DEFAULT); + } + + /** + * Get the user's subclassed + * {@link org.apache.giraph.graph.partition.GraphPartitionerFactory}. + * + * @return User's graph partitioner + */ + public Class> + getGraphPartitionerClass() { + return classes.getGraphPartitionerFactoryClass(); + } + + /** + * Create a user graph partitioner class + * + * @return Instantiated user graph partitioner class + */ + public GraphPartitionerFactory createGraphPartitioner() { + Class> klass = + classes.getGraphPartitionerFactoryClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Create a user graph partitioner partition stats class + * + * @return Instantiated user graph partition stats class + */ + public PartitionStats createGraphPartitionStats() { + return getMasterGraphPartitioner().createPartitionStats(); + } + + /** + * Get the cached MasterGraphPartitioner. + * + * @return MasterGraphPartitioner cached in this class. + */ + public MasterGraphPartitioner getMasterGraphPartitioner() { + return masterGraphPartitioner; + } + + /** + * Does the job have a {@link VertexInputFormat}? + * + * @return True iff a {@link VertexInputFormat} has been specified. + */ + public boolean hasVertexInputFormat() { + return classes.hasVertexInputFormat(); + } + + /** + * Get the user's subclassed + * {@link org.apache.giraph.graph.VertexInputFormat}. + * + * @return User's vertex input format class + */ + public Class> + getVertexInputFormatClass() { + return classes.getVertexInputFormatClass(); + } + + /** + * Create a user vertex input format class + * + * @return Instantiated user vertex input format class + */ + public VertexInputFormat + createVertexInputFormat() { + Class> klass = + classes.getVertexInputFormatClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Get the user's subclassed + * {@link org.apache.giraph.graph.VertexOutputFormat}. + * + * @return User's vertex output format class + */ + public Class> + getVertexOutputFormatClass() { + return classes.getVertexOutputFormatClass(); + } + + /** + * Create a user vertex output format class + * + * @return Instantiated user vertex output format class + */ + @SuppressWarnings("rawtypes") + public VertexOutputFormat createVertexOutputFormat() { + Class> klass = + classes.getVertexOutputFormatClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Does the job have an {@link EdgeInputFormat}? + * + * @return True iff an {@link EdgeInputFormat} has been specified. + */ + public boolean hasEdgeInputFormat() { + return classes.hasEdgeInputFormat(); + } + + /** + * Get the user's subclassed + * {@link org.apache.giraph.graph.EdgeInputFormat}. + * + * @return User's edge input format class + */ + public Class> getEdgeInputFormatClass() { + return classes.getEdgeInputFormatClass(); + } + + /** + * Create a user edge input format class + * + * @return Instantiated user edge input format class + */ + public EdgeInputFormat createEdgeInputFormat() { + Class> klass = getEdgeInputFormatClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}. + * + * @return User's aggregator writer class + */ + public Class getAggregatorWriterClass() { + return classes.getAggregatorWriterClass(); + } + + /** + * Create a user aggregator output format class + * + * @return Instantiated user aggregator writer class + */ + public AggregatorWriter createAggregatorWriter() { + return ReflectionUtils.newInstance(getAggregatorWriterClass(), this); + } + + /** + * Create a user combiner class + * + * @return Instantiated user combiner class + */ + @SuppressWarnings("rawtypes") + public Combiner createCombiner() { + Class> klass = classes.getCombinerClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** + * Check if user set a combiner + * + * @return True iff user set a combiner class + */ + public boolean useCombiner() { + return classes.hasCombinerClass(); + } + + /** + * Get the user's subclassed VertexResolver. + * + * @return User's vertex resolver class + */ + public Class> getVertexResolverClass() { + return classes.getVertexResolverClass(); + } + + /** + * Create a user vertex revolver + * + * @param graphState State of the graph from the worker + * @return Instantiated user vertex resolver + */ + @SuppressWarnings("rawtypes") + public VertexResolver createVertexResolver( + GraphState graphState) { + VertexResolver resolver = + ReflectionUtils.newInstance(getVertexResolverClass(), this); + resolver.setGraphState(graphState); + return resolver; + } + + /** + * Get the user's subclassed WorkerContext. + * + * @return User's worker context class + */ + public Class getWorkerContextClass() { + return classes.getWorkerContextClass(); + } + + /** + * Create a user worker context + * + * @param graphState State of the graph from the worker + * @return Instantiated user worker context + */ + @SuppressWarnings("rawtypes") + public WorkerContext createWorkerContext(GraphState graphState) { + WorkerContext workerContext = + ReflectionUtils.newInstance(getWorkerContextClass(), this); + workerContext.setGraphState(graphState); + return workerContext; + } + + /** + * Get the user's subclassed {@link org.apache.giraph.graph.MasterCompute} + * + * @return User's master class + */ + public Class getMasterComputeClass() { + return classes.getMasterComputeClass(); + } + + /** + * Create a user master + * + * @return Instantiated user master + */ + public MasterCompute createMasterCompute() { + return ReflectionUtils.newInstance(getMasterComputeClass(), this); + } + + /** + * Get the user's subclassed {@link org.apache.giraph.graph.Vertex} + * + * @return User's vertex class + */ + public Class> getVertexClass() { + return classes.getVertexClass(); + } + + /** + * Create a user vertex + * + * @return Instantiated user vertex + */ + public Vertex createVertex() { + return ReflectionUtils.newInstance(getVertexClass(), this); + } + + /** + * Get the user's subclassed vertex index class. + * + * @return User's vertex index class + */ + public Class getVertexIdClass() { + return classes.getVertexIdClass(); + } + + /** + * Create a user vertex index + * + * @return Instantiated user vertex index + */ + public I createVertexId() { + try { + return getVertexIdClass().newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createVertexId: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createVertexId: Illegally accessed", e); + } + } + + /** + * Get the user's subclassed vertex value class. + * + * @return User's vertex value class + */ + public Class getVertexValueClass() { + return classes.getVertexValueClass(); + } + + /** + * Create a user vertex value + * + * @return Instantiated user vertex value + */ + @SuppressWarnings("unchecked") + public V createVertexValue() { + Class klass = getVertexValueClass(); + if (klass == NullWritable.class) { + return (V) NullWritable.get(); + } else { + try { + return klass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createVertexValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createVertexValue: Illegally accessed", e); + } + } + } + + /** + * Create array of MasterObservers. + * + * @return Instantiated array of MasterObservers. + */ + public MasterObserver[] createMasterObservers() { + Class[] klasses = getMasterObserverClasses(); + MasterObserver[] objects = new MasterObserver[klasses.length]; + for (int i = 0; i < klasses.length; ++i) { + objects[i] = ReflectionUtils.newInstance(klasses[i], this); + } + return objects; + } + + /** + * Get the user's subclassed edge value class. + * + * @return User's vertex edge value class + */ + public Class getEdgeValueClass() { + return classes.getEdgeValueClass(); + } + + /** + * Create a user edge value + * + * @return Instantiated user edge value + */ + public E createEdgeValue() { + Class klass = getEdgeValueClass(); + if (klass == NullWritable.class) { + return (E) NullWritable.get(); + } else { + try { + return klass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createEdgeValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createEdgeValue: Illegally accessed", e); + } + } + } + + /** + * Get the user's subclassed vertex message value class. + * + * @return User's vertex message value class + */ + @SuppressWarnings("unchecked") + public Class getMessageValueClass() { + return classes.getMessageValueClass(); + } + + /** + * Create a user vertex message value + * + * @return Instantiated user vertex message value + */ + public M createMessageValue() { + Class klass = getMessageValueClass(); + if (klass == NullWritable.class) { + return (M) NullWritable.get(); + } else { + try { + return klass.newInstance(); + } catch (InstantiationException e) { + throw new IllegalArgumentException( + "createMessageValue: Failed to instantiate", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException( + "createMessageValue: Illegally accessed", e); + } + } + } + + /** + * Create a partition + * + * @param id Partition id + * @param progressable Progressable for reporting progress + * @return Instantiated partition + */ + public Partition createPartition( + int id, Progressable progressable) { + Class> klass = classes.getPartitionClass(); + Partition partition = ReflectionUtils.newInstance(klass, this); + partition.initialize(id, progressable); + return partition; + } + + /** + * Use unsafe serialization? + * + * @return True if using unsafe serialization, false otherwise. + */ + public boolean useUnsafeSerialization() { + return useUnsafeSerialization; + } + + /** + * Create an extended data output (can be subclassed) + * + * @return ExtendedDataOutput object + */ + public ExtendedDataOutput createExtendedDataOutput() { + if (useUnsafeSerialization) { + return new UnsafeByteArrayOutputStream(); + } else { + return new ExtendedByteArrayDataOutput(); + } + } + + /** + * Create an extended data output (can be subclassed) + * + * @param expectedSize Expected size + * @return ExtendedDataOutput object + */ + public ExtendedDataOutput createExtendedDataOutput(int expectedSize) { + if (useUnsafeSerialization) { + return new UnsafeByteArrayOutputStream(expectedSize); + } else { + return new ExtendedByteArrayDataOutput(expectedSize); + } + } + + /** + * Create an extended data output (can be subclassed) + * + * @param buf Buffer to use for the output (reuse perhaps) + * @param pos How much of the buffer is already used + * @return ExtendedDataOutput object + */ + public ExtendedDataOutput createExtendedDataOutput(byte[] buf, + int pos) { + if (useUnsafeSerialization) { + return new UnsafeByteArrayOutputStream(buf, pos); + } else { + return new ExtendedByteArrayDataOutput(buf, pos); + } + } + + /** + * Create an extended data input (can be subclassed) + * + * @param buf Buffer to use for the input + * @param off Where to start reading in the buffer + * @param length Maximum length of the buffer + * @return ExtendedDataInput object + */ + public ExtendedDataInput createExtendedDataInput( + byte[] buf, int off, int length) { + if (useUnsafeSerialization) { + return new UnsafeByteArrayInputStream(buf, off, length); + } else { + return new ExtendedByteArrayDataInput(buf, off, length); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java b/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java new file mode 100644 index 0000000..3b6b4b1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of Giraph configuration related things. + */ +package org.apache.giraph.conf; http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java new file mode 100644 index 0000000..948c9ab --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.counters; + +import com.google.common.base.Objects; +import org.apache.hadoop.mapreduce.Counter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Wrapper around Hadoop Counter to make it easier to use. + */ +public class GiraphHadoopCounter { + /** Hadoop Counter we're wrapping. */ + private Counter counter; + + /** + * Create wrapping a Hadoop Counter. + * + * @param counter Hadoop Counter to wrap. + */ + public GiraphHadoopCounter(Counter counter) { + this.counter = counter; + } + + /** + * Get underlying Hadoop Counter we're wrapping. + * + * @return Hadoop Counter being wrapped. + */ + public Counter getHadoopCounter() { + return counter; + } + + @Override + public int hashCode() { + return counter.hashCode(); + } + + @Override + public boolean equals(Object genericRight) { + if (genericRight == null) { + return false; + } + if (getClass() != genericRight.getClass()) { + return false; + } + GiraphHadoopCounter right = (GiraphHadoopCounter) genericRight; + return Objects.equal(counter, right.counter); + } + + /** + * Set counter to value. Should be greater than current value. + * + * @param value long value to set to. + */ + public void setValue(long value) { + increment(value - getValue()); + } + + /** + * Increment counter value by 1. + */ + public void increment() { + increment(1); + } + + /** + * Increment counter value. + * + * @param incr amount to increment by. + */ + public void increment(long incr) { + counter.increment(incr); + } + + /** + * Get counter value + * + * @return long value of counter + */ + public long getValue() { + return counter.getValue(); + } + + /** + * Get counter display name. + * + * @return String Hadoop counter display name. + */ + public String getDisplayName() { + return counter.getDisplayName(); + } + + /** + * Get counter name. + * + * @return String Hadoop counter name. + */ + public String getName() { + return counter.getName(); + } + + /** + * Write to Hadoop output. + * + * @param out DataOutput to write to. + * @throws IOException if something goes wrong. + */ + public void write(DataOutput out) throws IOException { + counter.write(out); + } + + /** + * Read from Hadoop input. + * + * @param in DataInput to read from. + * @throws IOException if something goes wrong reading. + */ + public void readFields(DataInput in) throws IOException { + counter.readFields(in); + } +}