Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB45E11D91 for ; Sun, 8 Jun 2014 18:28:49 +0000 (UTC) Received: (qmail 6423 invoked by uid 500); 8 Jun 2014 18:28:49 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 6282 invoked by uid 500); 8 Jun 2014 18:28:49 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 6189 invoked by uid 99); 8 Jun 2014 18:28:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 08 Jun 2014 18:28:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3C5AC46C0E; Sun, 8 Jun 2014 18:28:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pavanka@apache.org To: commits@giraph.apache.org Date: Sun, 08 Jun 2014 18:28:51 -0000 Message-Id: <6ac58b464c90466a91a93c5933ce8a9a@git.apache.org> In-Reply-To: <5271dae597434eddb2c15d6794c3fda0@git.apache.org> References: <5271dae597434eddb2c15d6794c3fda0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: updated refs/heads/trunk to 4a133f5 GIRAPH-908: support for partitioned input in giraph (pavanka) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a133f57 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a133f57 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a133f57 Branch: refs/heads/trunk Commit: 4a133f5766c09362917e0416af503c0a00b24e87 Parents: 535a333 Author: Pavan Kumar Authored: Sun Jun 8 10:36:03 2014 -0700 Committer: Pavan Kumar Committed: Sun Jun 8 10:36:03 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/bsp/BspService.java | 208 +++++++++++++----- .../giraph/bsp/CentralizedServiceMaster.java | 10 + .../org/apache/giraph/conf/GiraphClasses.java | 24 ++- .../org/apache/giraph/conf/GiraphConstants.java | 32 +++ .../ImmutableClassesGiraphConfiguration.java | 165 +++++++++++++++ .../apache/giraph/io/MappingInputFormat.java | 64 ++++++ .../org/apache/giraph/io/MappingReader.java | 124 +++++++++++ .../io/internal/WrappedMappingInputFormat.java | 99 +++++++++ .../io/internal/WrappedMappingReader.java | 105 ++++++++++ .../io/iterables/MappingReaderWrapper.java | 95 +++++++++ .../giraph/mapping/AbstractLongByteOps.java | 60 ++++++ .../mapping/DefaultEmbeddedLongByteOps.java | 73 +++++++ .../giraph/mapping/DefaultLongByteOps.java | 57 +++++ .../giraph/mapping/LongByteMappingStore.java | 143 +++++++++++++ .../org/apache/giraph/mapping/MappingEntry.java | 62 ++++++ .../org/apache/giraph/mapping/MappingStore.java | 70 +++++++ .../apache/giraph/mapping/MappingStoreOps.java | 72 +++++++ .../org/apache/giraph/mapping/package-info.java | 23 ++ .../translate/LongByteTranslateEdge.java | 123 +++++++++++ .../giraph/mapping/translate/TranslateEdge.java | 57 +++++ .../giraph/mapping/translate/package-info.java | 22 ++ .../apache/giraph/master/BspServiceMaster.java | 22 +- .../org/apache/giraph/master/MasterThread.java | 5 +- .../partition/GraphPartitionerFactory.java | 10 +- .../partition/HashPartitionerFactory.java | 24 +-- .../partition/HashRangePartitionerFactory.java | 24 +-- .../LongMappingStorePartitionerFactory.java | 61 ++++++ .../SimpleIntRangePartitionerFactory.java | 18 +- .../SimpleLongRangePartitionerFactory.java | 18 +- .../partition/SimplePartitionerFactory.java | 37 ++-- .../partition/SimpleWorkerPartitioner.java | 34 ++- .../apache/giraph/worker/BspServiceWorker.java | 138 +++++++++++- .../giraph/worker/EdgeInputSplitsCallable.java | 23 ++ .../giraph/worker/FullInputSplitCallable.java | 210 +++++++++++++++++++ .../org/apache/giraph/worker/LocalData.java | 93 ++++++++ .../worker/MappingInputSplitsCallable.java | 109 ++++++++++ .../MappingInputSplitsCallableFactory.java | 96 +++++++++ .../worker/VertexInputSplitsCallable.java | 59 ++++++ .../SimpleRangePartitionFactoryTest.java | 2 + .../apache/giraph/hive/HiveGiraphRunner.java | 73 +++++++ .../giraph/hive/common/GiraphHiveConstants.java | 4 + .../apache/giraph/hive/common/HiveUtils.java | 28 +++ .../input/mapping/AbstractHiveToMapping.java | 39 ++++ .../input/mapping/HiveMappingInputFormat.java | 116 ++++++++++ .../hive/input/mapping/HiveMappingReader.java | 100 +++++++++ .../hive/input/mapping/HiveToMapping.java | 44 ++++ .../hive/input/mapping/SimpleHiveToMapping.java | 105 ++++++++++ .../mapping/examples/LongByteHiveToMapping.java | 56 +++++ .../examples/LongInt2ByteHiveToMapping.java | 81 +++++++ .../input/mapping/examples/package-info.java | 22 ++ .../giraph/hive/input/mapping/package-info.java | 22 ++ 52 files changed, 3239 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index a0e94c1..37b94e2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-908: support for partitioned input in giraph (pavanka) + GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka) GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index ec0ddbb..2e35373 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -76,6 +76,25 @@ public abstract class BspService.Context context, GraphTaskManager graphTaskManager) { + this.mappingInputSplitsEvents = new InputSplitEvents(context); this.vertexInputSplitsEvents = new InputSplitEvents(context); this.edgeInputSplitsEvents = new InputSplitEvents(context); this.connectedEvent = new PredicateLock(context); @@ -313,6 +338,10 @@ public abstract class BspService> vertexOutputFormatClass; + /** Mapping input format - cached for fast access */ + protected Class> + mappingInputFormatClass; /** Edge input format class - cached for fast access */ protected Class> edgeInputFormatClass; @@ -113,8 +117,7 @@ public class GiraphClasses> edgeInputFilterClass; /** Vertex Input Filter class */ - protected Class> - vertexInputFilterClass; + protected Class> vertexInputFilterClass; /** * Empty constructor. Initialize with default classes or null. @@ -181,6 +184,9 @@ public class GiraphClasses>) EDGE_OUTPUT_FORMAT_CLASS.get(conf); + mappingInputFormatClass = (Class>) + MAPPING_INPUT_FORMAT_CLASS.get(conf); aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf); messageCombinerClass = @@ -335,6 +341,15 @@ public class GiraphClasses> + getMappingInputFormatClass() { + return mappingInputFormatClass; + } + /** * Check if EdgeInputFormat is set * http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 611a0dc..dd0c9ae 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -45,6 +45,7 @@ import org.apache.giraph.graph.VertexValueCombiner; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.DefaultEdgeInputFilter; @@ -56,6 +57,9 @@ import org.apache.giraph.job.DefaultJobObserver; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.job.HaltApplicationUtils; +import org.apache.giraph.mapping.MappingStore; +import org.apache.giraph.mapping.MappingStoreOps; +import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; @@ -80,6 +84,30 @@ public interface GiraphConstants { /** 1KB in bytes */ int ONE_KB = 1024; + /** Mapping related information */ + ClassConfOption MAPPING_STORE_CLASS = + ClassConfOption.create("giraph.mappingStoreClass", null, + MappingStore.class, "MappingStore Class"); + + /** Class to use for performing read operations on mapping store */ + ClassConfOption MAPPING_STORE_OPS_CLASS = + ClassConfOption.create("giraph.mappingStoreOpsClass", null, + MappingStoreOps.class, "MappingStoreOps class"); + + /** Upper value of LongByteMappingStore */ + IntConfOption LB_MAPPINGSTORE_UPPER = + new IntConfOption("giraph.lbMappingStoreUpper", -1, + "'upper' value used by lbmappingstore"); + /** Lower value of LongByteMappingStore */ + IntConfOption LB_MAPPINGSTORE_LOWER = + new IntConfOption("giraph.lbMappingStoreLower", -1, + "'lower' value used by lbMappingstore"); + /** Class used to conduct expensive edge translation during vertex input */ + ClassConfOption EDGE_TRANSLATION_CLASS = + ClassConfOption.create("giraph.edgeTranslationClass", null, + TranslateEdge.class, "Class used to conduct expensive edge " + + "translation during vertex input phase"); + /** Computation class - required */ ClassConfOption COMPUTATION_CLASS = ClassConfOption.create("giraph.computationClass", null, @@ -230,6 +258,10 @@ public interface GiraphConstants { ClassConfOption EDGE_INPUT_FORMAT_CLASS = ClassConfOption.create("giraph.edgeInputFormatClass", null, EdgeInputFormat.class, "EdgeInputFormat class"); + /** MappingInputFormat class */ + ClassConfOption MAPPING_INPUT_FORMAT_CLASS = + ClassConfOption.create("giraph.mappingInputFormatClass", null, + MappingInputFormat.class, "MappingInputFormat class"); /** EdgeInputFilter class */ ClassConfOption EDGE_INPUT_FILTER_CLASS = http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index e9f50f9..3d7b3db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -18,6 +18,7 @@ package org.apache.giraph.conf; +import com.google.common.base.Preconditions; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.edge.Edge; @@ -39,12 +40,14 @@ import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.graph.VertexValueCombiner; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.io.internal.WrappedEdgeInputFormat; import org.apache.giraph.io.internal.WrappedEdgeOutputFormat; +import org.apache.giraph.io.internal.WrappedMappingInputFormat; import org.apache.giraph.io.internal.WrappedVertexInputFormat; import org.apache.giraph.io.internal.WrappedVertexOutputFormat; import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput; @@ -53,6 +56,9 @@ import org.apache.giraph.io.superstep_output.SuperstepOutput; import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.job.GiraphJobRetryChecker; +import org.apache.giraph.mapping.MappingStore; +import org.apache.giraph.mapping.MappingStoreOps; +import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; import org.apache.giraph.master.SuperstepClasses; @@ -65,6 +71,7 @@ import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.utils.io.BigDataInputOutput; import org.apache.giraph.utils.io.DataInputOutput; import org.apache.giraph.utils.io.ExtendedDataInputOutput; @@ -91,6 +98,8 @@ public class ImmutableClassesGiraphConfiguration extends GiraphConfiguration { /** Holder for all the classes */ private final GiraphClasses classes; + /** Mapping target class */ + private Class mappingTargetClass = null; /** Value (IVEMM) Factories */ private final ValueFactories valueFactories; /** Language values (IVEMM) are implemented in */ @@ -148,6 +157,27 @@ public class ImmutableClassesGiraphConfiguration edgeTranslationClass() { + return EDGE_TRANSLATION_CLASS.get(this); + } + + /** + * Instance of TranslateEdge that contains helper method for edge translation + * + * @return instance of TranslateEdge + */ + public TranslateEdge edgeTranslationInstance() { + if (edgeTranslationClass() != null) { + return ReflectionUtils.newInstance(edgeTranslationClass(), this); + } + return null; + } + + /** * Get the vertex input filter class * * @return VertexInputFilter class @@ -274,6 +304,25 @@ public class ImmutableClassesGiraphConfiguration> + getMappingInputFormatClass() { + return classes.getMappingInputFormatClass(); + } + + /** + * Check if mappingInputFormat is set + * + * @return true if mappingInputFormat is set + */ + public boolean hasMappingInputFormat() { + return classes.hasMappingInputFormat(); + } + + /** * Create a user vertex output format class. * Note: Giraph should only use WrappedVertexOutputFormat, * which makes sure that Configuration parameters are set properly. @@ -287,6 +336,20 @@ public class ImmutableClassesGiraphConfiguration + createMappingInputFormat() { + Class> klass = + getMappingInputFormatClass(); + return ReflectionUtils.newInstance(klass, this); + } + + /** * Create a wrapper for user vertex output format, * which makes sure that Configuration parameters are set properly in all * methods related to this format. @@ -300,6 +363,22 @@ public class ImmutableClassesGiraphConfiguration + createWrappedMappingInputFormat() { + WrappedMappingInputFormat + wrappedMappingInputFormat = + new WrappedMappingInputFormat<>(createMappingInputFormat()); + configureIfPossible(wrappedMappingInputFormat); + return wrappedMappingInputFormat; + } + @Override public boolean hasEdgeOutputFormat() { return classes.hasEdgeOutputFormat(); @@ -756,6 +835,24 @@ public class ImmutableClassesGiraphConfiguration createEdge(TranslateEdge + translateEdge, Edge edge) { + I translatedId = translateEdge.translateId(edge.getTargetVertexId()); + if (isEdgeValueNullWritable()) { + return (Edge) EdgeFactory.create(translatedId); + } else { + return EdgeFactory.create(translatedId, + translateEdge.cloneValue(edge.getValue())); + } + } + + /** * Create a reusable edge. * * @return Instantiated reusable edge. @@ -856,6 +953,74 @@ public class ImmutableClassesGiraphConfiguration getMappingStoreClass() { + return MAPPING_STORE_CLASS.get(this); + } + + /** + * Create a {@link org.apache.giraph.mapping.MappingStore} instance + * + * @return MappingStore Instance + */ + public MappingStore createMappingStore() { + if (getMappingStoreClass() != null) { + return ReflectionUtils.newInstance(getMappingStoreClass(), this); + } else { + return null; + } + } + + /** + * Get MappingStoreOps class to be used + * + * @return MappingStoreOps class set by user + */ + public Class getMappingStoreOpsClass() { + return MAPPING_STORE_OPS_CLASS.get(this); + } + + /** + * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance + * + * @return MappingStoreOps Instance + */ + public MappingStoreOps createMappingStoreOps() { + if (getMappingStoreOpsClass() != null) { + return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this); + } else { + return null; + } + } + + /** + * Get mappingTarget class + * + * @return mappingTarget class + */ + public Class getMappingTargetClass() { + if (mappingTargetClass == null) { + Class[] classList = ReflectionUtils.getTypeArguments( + MappingStore.class, getMappingStoreClass()); + Preconditions.checkArgument(classList.length == 2); + mappingTargetClass = (Class) classList[1]; + } + return mappingTargetClass; + } + + /** + * Create and return mappingTarget instance + * + * @return mappingTarget instance + */ + public Writable createMappingTarget() { + return WritableUtils.createWritable(getMappingTargetClass()); + } + + /** * Create a user {@link org.apache.giraph.edge.OutEdges} * * @return Instantiated user OutEdges http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java new file mode 100644 index 0000000..2666268 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * + * Use this to load data for a BSP application. Note that the InputSplit must + * also implement Writable. + * + * It's guaranteed that whatever parameters are set in the configuration are + * also going to be available in all method arguments related to this input + * format (context in getSplits and createVertexReader; methods invoked on + * MappingReader). So if backing input format relies on some parameters from + * configuration, you can safely set them for example in + * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}. + * + * @param vertexId type + * @param vertexValue type + * @param edgeValue type + * @param mappingTarget type + */ +@SuppressWarnings("unchecked") +public abstract class MappingInputFormat + extends GiraphInputFormat { + + /** + * Create a vertex reader for a given split. Guaranteed to have been + * configured with setConf() prior to use. The framework will also call + * {@link VertexReader#initialize(InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext)} before + * the split is used. + * + * @param split the split to be read + * @param context the information about the task + * @return a new record reader + * @throws IOException + */ + public abstract MappingReader createMappingReader( + InputSplit split, TaskAttemptContext context) throws IOException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java new file mode 100644 index 0000000..b7ce97c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.mapping.MappingEntry; +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Will read the mapping from an input split. + * + * @param vertexId type + * @param vertexValue type + * @param edgeValue type + * @param mappingTarget type + */ +public abstract class MappingReader + extends DefaultImmutableClassesGiraphConfigurable + implements WorkerAggregatorUsage { + + /** Aggregator usage for vertex reader */ + private WorkerAggregatorUsage workerAggregatorUsage; + + /** + * Use the input split and context to setup reading the vertices. + * Guaranteed to be called prior to any other function. + * + * @param inputSplit Input split to be used for reading vertices. + * @param context Context from the task. + * @throws java.io.IOException + * @throws InterruptedException + */ + public abstract void initialize(InputSplit inputSplit, + TaskAttemptContext context) + throws IOException, InterruptedException; + + + /** + * Set aggregator usage. It provides the functionality + * of aggregation operation in reading a vertex. + * It is invoked just after initialization. + * E.g., + * vertexReader.initialize(inputSplit, context); + * vertexReader.setAggregator(aggregatorUsage); + * This method is only for use by the infrastructure. + * + * @param agg aggregator usage for vertex reader + */ + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + workerAggregatorUsage = agg; + } + + /** + * + * @return false iff there are no more vertices + * @throws IOException + * @throws InterruptedException + */ + public abstract boolean nextEntry() throws IOException, + InterruptedException; + + + /** + * Get the current entry. + * + * @return the current entry which has been read. + * nextVEntry() should be called first. + * @throws IOException + * @throws InterruptedException + */ + public abstract MappingEntry getCurrentEntry() + throws IOException, InterruptedException; + + + /** + * Close this {@link MappingReader} to future operations. + * + * @throws IOException + */ + public abstract void close() throws IOException; + + /** + * How much of the input has the {@link VertexReader} consumed i.e. + * has been processed by? + * + * @return Progress from 0.0 to 1.0. + * @throws IOException + * @throws InterruptedException + */ + public abstract float getProgress() throws IOException, InterruptedException; + + @Override + public void aggregate(String name, A value) { + workerAggregatorUsage.aggregate(name, value); + } + + @Override + public A getAggregatedValue(String name) { + return workerAggregatorUsage.getAggregatedValue(name); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java new file mode 100644 index 0000000..72f8177 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.internal; + +import org.apache.giraph.io.MappingInputFormat; +import org.apache.giraph.io.MappingReader; +import org.apache.giraph.job.HadoopUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +/** + * For internal use only. + * + * Wraps user set {@link org.apache.giraph.io.VertexInputFormat} to make + * sure proper configuration parameters are passed around, that user can set + * parameters in configuration and they will be available in other methods + * related to this format. + * + * @param vertexId type + * @param vertexValue type + * @param edgeValue type + * @param mappingTarget type + */ +public class WrappedMappingInputFormat + extends MappingInputFormat { + /** originalInputFormat to wrap over */ + private MappingInputFormat originalInputFormat; + + /** + * Constructor + * + * @param mappingInputFormat original mappingInputFormat + */ + public WrappedMappingInputFormat( + MappingInputFormat mappingInputFormat) { + originalInputFormat = mappingInputFormat; + } + + @Override + public void checkInputSpecs(Configuration conf) { + originalInputFormat.checkInputSpecs(conf); + } + + @Override + public List getSplits(JobContext context, int minSplitCountHint) + throws IOException, InterruptedException { + return originalInputFormat.getSplits( + HadoopUtils.makeJobContext(getConf(), context), + minSplitCountHint); + } + + @Override + public MappingReader createMappingReader(InputSplit split, + TaskAttemptContext context) throws IOException { + final MappingReader mappingReader = originalInputFormat + .createMappingReader(split, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + return new WrappedMappingReader<>(mappingReader, getConf()); + } + + + @Override + public void writeInputSplit(InputSplit inputSplit, + DataOutput dataOutput) throws IOException { + originalInputFormat.writeInputSplit(inputSplit, dataOutput); + } + + @Override + public InputSplit readInputSplit( + DataInput dataInput) throws IOException, ClassNotFoundException { + return originalInputFormat.readInputSplit(dataInput); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java new file mode 100644 index 0000000..7d1c4c9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.internal; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.MappingReader; +import org.apache.giraph.job.HadoopUtils; +import org.apache.giraph.mapping.MappingEntry; +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * For internal use only. + * + * Wraps {@link org.apache.giraph.io.MappingReader} to make sure proper + * configuration parameters are passed around, that parameters set in original + * configuration are available in methods of this reader + * + * @param vertexId type + * @param vertexValue type + * @param edgeValue type + * @param mappingTarget type + */ +public class WrappedMappingReader + extends MappingReader { + /** User set baseMappingReader wrapped over */ + private final MappingReader baseMappingReader; + + /** + * Constructor + * + * @param baseMappingReader User set baseMappingReader + * @param conf configuration + */ + public WrappedMappingReader(MappingReader baseMappingReader, + ImmutableClassesGiraphConfiguration conf) { + this.baseMappingReader = baseMappingReader; + super.setConf(conf); + baseMappingReader.setConf(conf); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration conf) { + // We don't want to use external configuration + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + baseMappingReader.initialize(inputSplit, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); + } + + + @Override + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + // Set aggregator usage for vertex reader + baseMappingReader.setWorkerAggregatorUse(agg); + } + + @Override + public boolean nextEntry() throws IOException, InterruptedException { + return baseMappingReader.nextEntry(); + } + + @Override + public MappingEntry getCurrentEntry() + throws IOException, InterruptedException { + return baseMappingReader.getCurrentEntry(); + } + + + @Override + public void close() throws IOException { + baseMappingReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return baseMappingReader.getProgress(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java new file mode 100644 index 0000000..d8b9c31 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.iterables; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.MappingReader; +import org.apache.giraph.mapping.MappingEntry; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Wraps {@link GiraphReader} for mapping into + * {@link org.apache.giraph.io.MappingReader} + * + * @param vertexId type + * @param vertexValue type + * @param edgeValue type + * @param mappingTarget type + */ +public class MappingReaderWrapper + extends MappingReader { + /** Wrapped mapping reader */ + private GiraphReader> mappingReader; + /** + * {@link org.apache.giraph.io.MappingReader}-like wrapper of + * {@link #mappingReader} + */ + private IteratorToReaderWrapper> iterator; + + /** + * Constructor + * + * @param mappingReader user supplied mappingReader + */ + public MappingReaderWrapper(GiraphReader> mappingReader) { + this.mappingReader = mappingReader; + iterator = new IteratorToReaderWrapper<>(mappingReader); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration conf) { + super.setConf(conf); + conf.configureIfPossible(mappingReader); + } + + @Override + public boolean nextEntry() throws IOException, InterruptedException { + return iterator.nextObject(); + } + + @Override + public MappingEntry getCurrentEntry() + throws IOException, InterruptedException { + return iterator.getCurrentObject(); + } + + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + mappingReader.initialize(inputSplit, context); + } + + @Override + public void close() throws IOException { + mappingReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return mappingReader.getProgress(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java new file mode 100644 index 0000000..2e2310b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Implementation of basic methods in MappingStoreOps + */ +@SuppressWarnings("unchecked, rawtypes") +public abstract class AbstractLongByteOps + implements MappingStoreOps { + /** Mapping store instance to operate on */ + protected LongByteMappingStore mappingStore; + + @Override + public void initialize(MappingStore mappingStore) { + this.mappingStore = (LongByteMappingStore) mappingStore; + } + + /** + * Compute partition given id, partitionCount, workerCount & target + * @param id vertex id + * @param partitionCount number of partitions + * @param workerCount number of workers + * @param target target worker + * @return partition number + */ + protected int computePartition(LongWritable id, int partitionCount, + int workerCount, byte target) { + int numRows = partitionCount / workerCount; + numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1; + if (target == -1) { + // default to hash based partitioning + return Math.abs(id.hashCode() % partitionCount); + } else { + int targetWorker = target & 0xFF; + // assume zero based indexing of partition & worker [also consecutive] + return numRows * targetWorker + Math.abs(id.hashCode() % numRows); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java new file mode 100644 index 0000000..0b6f3e4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.hadoop.io.LongWritable; + +/** + * MappingStoreOps implementation used to embed target information into + * vertex id. Stores information in the higher order bits of the long id + */ +public class DefaultEmbeddedLongByteOps extends AbstractLongByteOps { + /** Bit mask for first 9 bits in a long */ + private static final long MASK = ((long) 0x1FF) << 55; + /** Inverse of MASK */ + private static final long IMASK = ~ MASK; + + /** + * Default constructor (do not use) + */ + public DefaultEmbeddedLongByteOps() { + } + + @Override + public boolean hasEmbedding() { + return true; + } + + @Override + public void embedTargetInfo(LongWritable id) { + if ((id.get() & MASK) != 0) { + throw new IllegalStateException("Expected first 9 bits of long " + + " to be empty"); + } + byte target = mappingStore.getByteTarget(id); + // first bit = 0 & rest 8 bits set to target + // add 1 to distinguish between not set and assignment to worker-0 + // (prefix bits = 0 can mean one of two things : + // no entry in the mapping, in which case target = -1, so -1 + 1 = 0 + // vertex is created later during computation, so prefix bits are 0 anyway) + long maskValue = ((1L + target) & 0xFF) << 55; + id.set(id.get() | maskValue); + } + + @Override + public void removeTargetInfo(LongWritable id) { + id.set(id.get() & IMASK); + } + + @Override + public int getPartition(LongWritable id, int partitionCount, + int workerCount) { + // extract last 8 bits + // subtract 1 since added 1 during embedInfo (unset = -1) + byte target = (byte) (((id.get() >>> 55) & 0xFF) - 1); + return computePartition(id, partitionCount, workerCount, target); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java new file mode 100644 index 0000000..57ece94 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.hadoop.io.LongWritable; + +/** + * MappingStoreOps implementation which reads partition information from map + */ +@SuppressWarnings("unchecked, rawtypes") +public class DefaultLongByteOps extends AbstractLongByteOps { + + /** + * Default constructor (do not use) + */ + public DefaultLongByteOps() { + } + + @Override + public boolean hasEmbedding() { + return false; + } + + @Override + public void embedTargetInfo(LongWritable id) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeTargetInfo(LongWritable id) { + throw new UnsupportedOperationException(); + } + + @Override + public int getPartition(LongWritable id, int partitionCount, + int workerCount) { + byte target = mappingStore.getByteTarget(id); + return computePartition(id, partitionCount, workerCount, target); + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java new file mode 100644 index 0000000..996fed0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import com.google.common.collect.MapMaker; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * + * An implementation of MappingStore + * + * Methods implemented here are thread safe by default because it is guaranteed + * that each entry is written to only once. + * It can represent up to a maximum of 254 workers + * any byte passed is treated as unsigned + */ +@ThreadSafe +public class LongByteMappingStore + extends DefaultImmutableClassesGiraphConfigurable implements MappingStore { + /** Logger instance */ + private static final Logger LOG = Logger.getLogger( + LongByteMappingStore.class); + + /** Counts number of entries added */ + private final AtomicLong numEntries = new AtomicLong(0); + + /** Id prefix to bytesArray index mapping */ + private ConcurrentMap concurrentIdToBytes; + /** Primitive idToBytes for faster querying */ + private Long2ObjectOpenHashMap idToBytes; + /** Number of lower order bits */ + private int lower; + /** Number of distinct prefixes */ + private int upper; + /** Bit mask for lowerOrder suffix bits */ + private int lowerBitMask; + /** LowerOrder bits count */ + private int lowerOrder; + + @Override + public void initialize() { + upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf()); + lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf()); + + if ((lower & (lower - 1)) != 0) { + throw new IllegalStateException("lower not a power of two"); + } + + lowerBitMask = lower - 1; + lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower) + concurrentIdToBytes = new MapMaker() + .initialCapacity(upper) + .concurrencyLevel(getConf().getNumInputSplitsThreads()) + .makeMap(); + idToBytes = new Long2ObjectOpenHashMap<>(upper); + } + + /** + * Auxiliary method to be used by getTarget + * + * @param vertexId vertexId + * @return return byte value of target + */ + public byte getByteTarget(LongWritable vertexId) { + long key = vertexId.get() >>> lowerOrder; + int suffix = (int) (vertexId.get() & lowerBitMask); + if (!idToBytes.containsKey(key)) { + return -1; + } + return idToBytes.get(key)[suffix]; + } + + @Override + public void addEntry(LongWritable vertexId, ByteWritable target) { + long key = vertexId.get() >>> lowerOrder; + byte[] bytes = concurrentIdToBytes.get(key); + if (bytes == null) { + byte[] newBytes = new byte[lower]; + Arrays.fill(newBytes, (byte) -1); + bytes = concurrentIdToBytes.putIfAbsent(key, newBytes); + if (bytes == null) { + bytes = newBytes; + } + } + bytes[(int) (vertexId.get() & lowerBitMask)] = target.get(); + numEntries.getAndIncrement(); // increment count + } + + @Override + public ByteWritable getTarget(LongWritable vertexId, + ByteWritable target) { + Byte bval = getByteTarget(vertexId); + if (bval == -1) { // worker not assigned by mapping + return null; + } + target.set(bval); + return target; + } + + @Override + public void postFilling() { + // not thread-safe + for (Long id : concurrentIdToBytes.keySet()) { + idToBytes.put(id, concurrentIdToBytes.get(id)); + } + concurrentIdToBytes.clear(); + concurrentIdToBytes = null; + } + + @Override + public long getStats() { + return numEntries.longValue(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java new file mode 100644 index 0000000..8c8efa1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * An entry in MappingStore + * + * @param vertexId type + * @param mappingTarget type + */ +public class MappingEntry { + /** Vertex Id */ + private I vertexId; + /** Mapping Target */ + private B mappingTarget; + + /** + * Constructor + * + * @param vertexId vertexId + * @param mappingTarget mappingTarget + */ + public MappingEntry(I vertexId, B mappingTarget) { + this.vertexId = vertexId; + this.mappingTarget = mappingTarget; + } + + public I getVertexId() { + return vertexId; + } + + public B getMappingTarget() { + return mappingTarget; + } + + public void setVertexId(I vertexId) { + this.vertexId = vertexId; + } + + public void setMappingTarget(B mappingTarget) { + this.mappingTarget = mappingTarget; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java new file mode 100644 index 0000000..5b872a4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * MappingStore used to store the vertexId - target map supplied by user + * Methods implemented in this class need to be thread safe + * + * @param vertexId type + * @param mappingTarget type + */ +public interface MappingStore + extends ImmutableClassesGiraphConfigurable { + + /** + * Must be called before anything else can be done + * on this instance + */ + void initialize(); + + /** + * Add an entry to the mapping store + * + * @param vertexId vertexId + * @param target target + */ + void addEntry(I vertexId, B target); + + /** + * Get target for given vertexId + * + * @param vertexId vertexId + * @param target instance to use for storing target information + * @return target instance + */ + B getTarget(I vertexId, B target); + + /** + * Operations to perform after adding entries + * to the mapping store + */ + void postFilling(); + + /** + * Get stats about the MappingStore + * + * @return numEntries + */ + long getStats(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java new file mode 100644 index 0000000..aba034e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface of operations that can be done on mapping store + * once it is fully loaded + * + * @param vertex id type + * @param mapping target type + */ +public interface MappingStoreOps { + + /** + * Must be called before anything else can be done + * on this instance + * @param mappingStore mapping store instance to operate on + */ + void initialize(MappingStore mappingStore); + + /** + * True if MappingStoreOps is based on embedding info + * + * @return true if worker info is embedded into vertex ids + */ + boolean hasEmbedding(); + + /** + * Embed target information into vertexId + * + * @param id vertexId + */ + void embedTargetInfo(I id); + + /** + * Remove target information from vertexId + * + * @param id vertexId + */ + void removeTargetInfo(I id); + + + /** + * Get partition id for a vertex id + * + * @param id vertexId + * @param partitionCount partitionCount + * @param workerCount workerCount + * @return partition of vertex id + */ + int getPartition(I id, int partitionCount, int workerCount); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java new file mode 100644 index 0000000..ae23475 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package contains definition and implementations of MappingStore and + * related concepts + */ +package org.apache.giraph.mapping; http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java new file mode 100644 index 0000000..102ef50 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping.translate; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.worker.BspServiceWorker; +import org.apache.giraph.worker.LocalData; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +/** + * Basic implementation of Translate Edge + * where I = LongWritable & B = ByteWritable + * + * @param edge value type + */ +@SuppressWarnings("unchecked") +public class LongByteTranslateEdge + extends DefaultImmutableClassesGiraphConfigurable + implements TranslateEdge { + + /** Local data used for targetId translation of edge */ + private LocalData localData; + + @Override + public void initialize(BspServiceWorker service) { + localData = (LocalData) + service.getLocalData(); + } + + @Override + public LongWritable translateId(LongWritable targetId) { + LongWritable translatedId = new LongWritable(); + translatedId.set(targetId.get()); + localData.getMappingStoreOps().embedTargetInfo(translatedId); + return translatedId; + } + + @Override + public E cloneValue(E edgeValue) { + // If vertex input does not have create edges, + // then you can use LongByteTranslateEdge directly + throw new UnsupportedOperationException(); + } + + /** + * Correct implementation of cloneValue when edgevalue = nullwritable + */ + public static class NoEdgeValue + extends LongByteTranslateEdge { + @Override + public NullWritable cloneValue(NullWritable edgeValue) { + return NullWritable.get(); + } + } + + /** + * Correct implementation of cloneValue when edgevalue = intwritable + */ + public static class IntEdgeValue + extends LongByteTranslateEdge { + @Override + public IntWritable cloneValue(IntWritable edgeValue) { + return new IntWritable(edgeValue.get()); + } + } + + /** + * Correct implementation of cloneValue when edgevalue = longwritable + */ + public static class LongEdgeValue + extends LongByteTranslateEdge { + @Override + public LongWritable cloneValue(LongWritable edgeValue) { + return new LongWritable(edgeValue.get()); + } + } + + /** + * Correct implementation of cloneValue when edgevalue = floatwritable + */ + public static class FloatEdgeValue + extends LongByteTranslateEdge { + @Override + public FloatWritable cloneValue(FloatWritable edgeValue) { + return new FloatWritable(edgeValue.get()); + } + } + + /** + * Correct implementation of cloneValue when edgevalue = doublewritable + */ + public static class DoubleEdgeValue + extends LongByteTranslateEdge { + @Override + public DoubleWritable cloneValue(DoubleWritable edgeValue) { + return new DoubleWritable(edgeValue.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java new file mode 100644 index 0000000..85e8768 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.mapping.translate; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.worker.BspServiceWorker; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Used to conduct expensive translation of edges + * during vertex input phase + * + * @param vertexId type + * @param edgeValue type + */ +public interface TranslateEdge + extends ImmutableClassesGiraphConfigurable { + /** + * Must be called before other methods can be used + * + * @param service bsp service worker + */ + void initialize(BspServiceWorker service); + + /** + * Translate Id & return a new instance + * + * @param targetId edge target Id + * @return a new translated Id instance + */ + I translateId(I targetId); + + /** + * Clone edge value + * + * @param edgeValue edge value + * @return clone of edge value + */ + E cloneValue(E edgeValue); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java new file mode 100644 index 0000000..8536e83 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Definitions & sample implementations of edge translation logic + */ +package org.apache.giraph.mapping.translate; http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 90dc9f3..e367b94 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -40,6 +40,7 @@ import org.apache.giraph.graph.GraphState; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.partition.MasterGraphPartitioner; import org.apache.giraph.partition.PartitionOwner; @@ -119,7 +120,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY; * @param Vertex data * @param Edge data */ -@SuppressWarnings("rawtypes") +@SuppressWarnings("rawtypes, unchecked") public class BspServiceMaster extends BspService @@ -217,7 +218,7 @@ public class BspServiceMaster mappingInputFormat = + getConfiguration().createWrappedMappingInputFormat(); + return createInputSplits(mappingInputFormat, mappingInputSplitsPaths, + "Mapping"); + } + + @Override public int createVertexInputSplits() { // Short-circuit if there is no vertex input format if (!getConfiguration().hasVertexInputFormat()) { @@ -1589,8 +1601,12 @@ public class BspServiceMaster