Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1EBCB18495 for ; Thu, 11 Jun 2015 02:50:41 +0000 (UTC) Received: (qmail 98936 invoked by uid 500); 11 Jun 2015 02:50:41 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 98796 invoked by uid 500); 11 Jun 2015 02:50:40 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 98755 invoked by uid 99); 11 Jun 2015 02:50:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 02:50:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A158BE04D7; Thu, 11 Jun 2015 02:50:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ikabiljo@apache.org To: commits@giraph.apache.org Date: Thu, 11 Jun 2015 02:50:42 -0000 Message-Id: In-Reply-To: <2150aed768ae48959e03eeac262e5dd3@git.apache.org> References: <2150aed768ae48959e03eeac262e5dd3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] git commit: updated refs/heads/trunk to 819d6d3 http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java new file mode 100644 index 0000000..fd38520 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.output; + +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.giraph.block_app.framework.api.BlockOutputApi; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.ProgressableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Progressable; + +/** + * Handler for blocks output - keeps track of outputs and writers created + */ +@SuppressWarnings("unchecked") +public class BlockOutputHandle implements BlockOutputApi { + private transient Configuration conf; + private transient Progressable progressable; + private final Map outputDescMap; + private final Map> freeWriters; + private final Map> occupiedWriters; + + public BlockOutputHandle(String jobIdentifier, Configuration conf, + Progressable hadoopProgressable) { + outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap( + conf, jobIdentifier); + freeWriters = new HashMap<>(); + occupiedWriters = new HashMap<>(); + for (String confOption : outputDescMap.keySet()) { + freeWriters.put(confOption, + new ConcurrentLinkedQueue()); + occupiedWriters.put(confOption, + new ConcurrentLinkedQueue()); + } + initialize(conf, hadoopProgressable); + } + + public void initialize(Configuration conf, Progressable progressable) { + this.conf = conf; + this.progressable = progressable; + } + + + @Override + public > + OD getOutputDesc(String confOption) { + return (OD) outputDescMap.get(confOption); + } + + @Override + public OW getWriter(String confOption) { + OW outputWriter = (OW) freeWriters.get(confOption).poll(); + if (outputWriter == null) { + outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter( + conf, progressable); + } + occupiedWriters.get(confOption).add(outputWriter); + return outputWriter; + } + + public void returnAllWriters() { + for (Map.Entry> entry : + occupiedWriters.entrySet()) { + freeWriters.get(entry.getKey()).addAll(entry.getValue()); + entry.getValue().clear(); + } + } + + public void closeAllWriters() { + final Queue allWriters = new ConcurrentLinkedQueue<>(); + for (Queue blockOutputWriters : freeWriters.values()) { + allWriters.addAll(blockOutputWriters); + } + if (allWriters.isEmpty()) { + return; + } + // Closing writers can take time - use multiple threads and call progress + CallableFactory callableFactory = new CallableFactory() { + @Override + public Callable newCallable(int callableId) { + return new Callable() { + @Override + public Void call() throws Exception { + BlockOutputWriter writer = allWriters.poll(); + while (writer != null) { + writer.close(); + writer = allWriters.poll(); + } + return null; + } + }; + } + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, + Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf), + allWriters.size()), "close-writers-%d", progressable); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java new file mode 100644 index 0000000..5f82612 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputOption.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.output; + +import org.apache.giraph.block_app.framework.api.BlockOutputApi; +import org.apache.giraph.conf.GiraphConfiguration; + +/** + * Block output option, with apis to use from application code + * + * @param Output description type + * @param Output writer type + */ +public class BlockOutputOption, + OW extends BlockOutputWriter> { + private final String key; + + public BlockOutputOption(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + public void register(OD outputDesc, GiraphConfiguration conf) { + BlockOutputFormat.addOutputDesc(outputDesc, key, conf); + } + + public OD getOutputDesc(BlockOutputApi outputApi) { + return outputApi.getOutputDesc(key); + } + + public OW getWriter(BlockOutputApi outputApi) { + return outputApi.getWriter(key); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java new file mode 100644 index 0000000..5574ab4 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputWriter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.output; + +import java.io.Closeable; + +/** + * Block output writer + */ +public interface BlockOutputWriter extends Closeable { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java new file mode 100644 index 0000000..11193b1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Multi-output support for Block Applications + */ +package org.apache.giraph.block_app.framework.output; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java new file mode 100644 index 0000000..0b6934e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Block application abstraction package. + * + * Giraph application is represented as a collection of pieces, + * aggregated via blocks, ultimately into a single block, that + * represents complete application execution. + */ +package org.apache.giraph.block_app.framework; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java new file mode 100644 index 0000000..882f4f1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece; + +import java.util.Iterator; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.function.Consumer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.Iterators; + +/** + * Parent of all Pieces, contains comprehensive list of methods Piece + * can support. Specific subclasses should be extended directly, + * to simplify usage - most frequently for example Piece class. + * + * Single unit of execution, capturing: + * - sending and then receiving messages from vertices + * - sending data to be aggregated from workers to master + * - sending values from master, via aggregators, to workers + * - sending and receiving worker messages + * + * + * Order of execution is: + * - On master, once at the start of the application + * -- registerAggregators (deprecated, use registerReducers instead) + * + * - After masterCompute of previous piece, on master: + * -- registerReducers + * + * - Send logic on workers: + * -- getVertexSender per each worker thread, and on object returned by it: + * --- vertexSend on each vertex + * --- postprocess on each worker thread + * -- workerContextSend per worker + * + * - Logic on master: + * -- masterCompute + * + * - Receive logic on workers: + * -- workerContextReceive per worker + * -- getVertexReceiver per each worker thread, and on object returned by it: + * --- vertexReceive on each vertex + * --- postprocess on each worker thread + * + * And before everything, during initialization, registerAggregators. + * + * Only masterCompute and registerReducers/registerAggregators should modify + * the Piece, all of the worker methods should treat Piece as read-only. + * + * Each piece should be encapsulated unit of execution. Vertex value should be + * used as a single implicit "communication" channel between different pieces, + * all other dependencies should be explicitly defined and passed through + * constructor, via interfaces (as explained below). + * I.e. state of the vertex value is invariant that Pieces act upon. + * Best is not to depend on explicit vertex value class, but on interface that + * provides all needed functions, so that pieces can be freely combined, + * as long as vertex value implements appropriate ones. + * Similarly, use most abstract class you need - if Piece doesn't depend + * on edge value, don't use NullWritable, but Writable. Or if it doesn't + * depend on ExecutionStage, use Object for it. + * + * All other external dependencies should be explicitly passed through + * constructor, through interfaces. + * + * All Pieces will be created within one context - on the master. + * They are then going to be replicated across all workers, and across all + * threads within each worker, and will see everything that happens in global + * context (masterCompute) before them, including any state master has. + * Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in + * global context, and from global context to worker functions of a Piece + * that happens in the future. + * + * VertexReceiver of previous Piece and VertexSender of next Piece live in + * the same context, and vertexReceive of the next Piece is executed + * immediately after vertexSend of the previous piece, before vertexSend is + * called on the next vertex. + * This detail allows you to have external dependency on each other through + * memory only mediator objects - like ObjectTransfer. + * + * All other logic going to live in different contexts, + * specifically VertexSender and VertexReceiver of the same Piece, + * or workerContextSend and VertexSender of the same Piece, and cannot interact + * with each other outside of changing the state of the graph or using + * global communication api. + * + * All methods on this class (or objects it returns) will be called serially, + * so there is no need for any Thread synchronization. + * Each Thread will have a complete deep copy of the Piece, to achieve that, + * so all static fields must be written to be Thread safe! + * (i.e. either immutable, or have synchronized/locked access to them) + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Worker value type + * @param Worker message type + * @param Execution stage type + */ +@SuppressWarnings({ "rawtypes" }) +public abstract class AbstractPiece implements Block { + + // Overridable functions + + // registerReducers(CreateReducersApi reduceApi, S executionStage) + + /** + * Add automatic handling of reducers to registerReducers. + * Only for internal use. + */ + public abstract void wrappedRegisterReducers( + BlockMasterApi masterApi, S executionStage); + + // getVertexSender(BlockWorkerSendApi workerApi, S executionStage) + + /** + * Add automatic handling of reducers to getVertexSender. + * + * Only for Framework internal use. + */ + public abstract InnerVertexSender getWrappedVertexSender( + final BlockWorkerSendApi workerApi, S executionStage); + + /** + * Override to have worker context send computation. + * + * Called once per worker, after all vertices have been processed with + * getVertexSender. + */ + public void workerContextSend( + BlockWorkerContextSendApi workerContextApi, S executionStage, + WV workerValue) { + } + + /** + * Function that is called on master, after send phase, before receive phase. + * + * It can: + * - read aggregators sent from worker + * - do global processing + * - send data to workers through aggregators + */ + public void masterCompute(BlockMasterApi masterApi, S executionStage) { + } + + /** + * Override to have worker context receive computation. + * + * Called once per worker, before all vertices are going to be processed + * with getVertexReceiver. + */ + public void workerContextReceive( + BlockWorkerContextReceiveApi workerContextApi, S executionStage, + WV workerValue, List workerMessages) { + } + + /** + * Override to do vertex receive processing. + * + * Creates handler that defines what should be executed on worker + * for each vertex during receive phase. + * + * This logic executed last. + * This function is called once on each worker on each thread, in parallel, + * on their copy of Piece object to create functions handler. + * + * If returned object implements Postprocessor interface, then corresponding + * postprocess() function is going to be called once, after all vertices + * corresponding thread needed to process are done. + */ + public VertexReceiver getVertexReceiver( + BlockWorkerReceiveApi workerApi, S executionStage) { + return null; + } + + /** + * Returns MessageClasses definition for messages being sent by this Piece. + */ + public abstract MessageClasses getMessageClasses( + ImmutableClassesGiraphConfiguration conf); + + /** + * Override to provide different next execution stage for + * Pieces that come after it. + * + * Execution stage should be immutable, and this function should be + * returning a new object, if it needs to return different value. + * + * It affects pieces that come after this piece, + * and isn't applied to execution stage this piece sees. + */ + public S nextExecutionStage(S executionStage) { + return executionStage; + } + + /** + * Override to register any potential aggregators used by this piece. + * + * @deprecated Use registerReducers instead. + */ + @Deprecated + public void registerAggregators(BlockMasterApi masterApi) + throws InstantiationException, IllegalAccessException { + } + + // Inner classes + + /** Inner class to provide clean use without specifying types */ + public abstract class InnerVertexSender + implements VertexSender, VertexPostprocessor { + @Override + public void postprocess() { } + } + + /** Inner class to provide clean use without specifying types */ + public abstract class InnerVertexReceiver + implements VertexReceiver, VertexPostprocessor { + @Override + public void postprocess() { } + } + + // Internal implementation + + @Override + public final Iterator iterator() { + return Iterators.singletonIterator(this); + } + + @Override + public void forAllPossiblePieces(Consumer consumer) { + consumer.apply(this); + } + + @Override + public String toString() { + String name = getClass().getSimpleName(); + if (name.isEmpty()) { + name = getClass().getName(); + } + return name; + } + + + // make hashCode and equals final, forcing them to be based on + // reference identity. + @Override + public final int hashCode() { + return super.hashCode(); + } + + @Override + public final boolean equals(Object obj) { + return super.equals(obj); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java new file mode 100644 index 0000000..0963efb --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/DefaultParentPiece.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper; +import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses; +import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf; +import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf; +import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.EnumConfOption; +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * Additional abstract implementations for all pieces to be used. + * Code here is not in AbstractPiece only to allow for non-standard + * non-user-defined pieces.
+ * Only logic used by the underlying framework directly is in AbstractPiece + * itself. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Worker value type + * @param Worker message type + * @param Execution stage type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public abstract class DefaultParentPiece extends AbstractPiece { + // TODO move to GiraphConstants + /** + * This option will tell which message encode & store enum to force, when + * combining is not enabled. + * + * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper + * and lower bound on message store type, when looking them in order from + * not doing anything special, to most advanced type: + * BYTEARRAY_PER_PARTITION, + * EXTRACT_BYTEARRAY_PER_PARTITION, + * POINTER_LIST_PER_VERTEX + * resulting encode type is going to be: + * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ? + * POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION) + * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType); + * + * This is useful to force all pieces onto particular message store, even + * if they do not overrideallowOneMessageToManyIdsEncoding, though that might + * be rarely needed. + * This option might be more useful for fully local computation, + * where overall job behavior is quite different. + */ + public static final EnumConfOption + MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE = + EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce", + MessageEncodeAndStoreType.class, + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION, + "Select the message_encode_and_store_type min force to use"); + + private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject(); + private ReducersForPieceHandler reducersHandler; + + // Overridable functions + + /** + * Override to register any potential reducers used by this piece, + * through calls to {@code reduceApi}, which will return reducer handles + * for simple. + *
+ * Tip: Without defining a field, first write here name of the field and what + * you want to reduce, like: + *
+ * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); } + *
+ * and then use tools your IDE provides to generate field signature itself, + * which might be slightly complex: + *
+ * {@code ReducerHandle totalSum; } + */ + public void registerReducers(CreateReducersApi reduceApi, S executionStage) { + } + + /** + * Override to do vertex send processing. + * + * Creates handler that defines what should be executed on worker + * during send phase. + * + * This logic gets executed first. + * This function is called once on each worker on each thread, in parallel, + * on their copy of Piece object to create functions handler. + * + * If returned object implements Postprocessor interface, then corresponding + * postprocess() function is going to be called once, after all vertices + * corresponding thread needed to process are done. + */ + public VertexSender getVertexSender( + BlockWorkerSendApi workerApi, S executionStage) { + return null; + } + + /** + * Override to specify type of the message this Piece sends, if it does + * send messages. + * + * If not overwritten, no messages can be sent. + */ + protected Class getMessageClass() { + return null; + } + + /** + * Override to specify message value factory to be used, + * which creates objects into which messages will be deserialized. + * + * If not overwritten, or null is returned, DefaultMessageValueFactory + * will be used. + */ + protected MessageValueFactory getMessageFactory( + ImmutableClassesGiraphConfiguration conf) { + return null; + } + + /** + * Override to specify message combiner to be used, if any. + * + * Message combiner itself should be immutable + * (i.e. it will be call simultanously from multiple threads) + */ + protected MessageCombiner getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return null; + } + + /** + * Override to specify that this Piece allows one to many ids encoding to be + * used for messages. + * You should override this function, if you are sending identical message to + * all targets, and message itself is not extremely small. + */ + protected boolean allowOneMessageToManyIdsEncoding() { + return false; + } + + @Override + public MessageClasses getMessageClasses( + ImmutableClassesGiraphConfiguration conf) { + Class messageClass = null; + MessageValueFactory messageFactory = getMessageFactory(conf); + MessageCombiner messageCombiner = getMessageCombiner(conf); + + if (messageFactory != null) { + messageClass = (Class) messageFactory.newInstance().getClass(); + } else if (messageCombiner != null) { + messageClass = (Class) messageCombiner.createInitialMessage().getClass(); + } + + if (messageClass != null) { + Preconditions.checkState(getMessageClass() == null, + "Piece %s defines getMessageFactory or getMessageCombiner, " + + "so it doesn't need to define getMessageClass.", + toString()); + } else { + messageClass = getMessageClass(); + if (messageClass == null) { + messageClass = (Class) NoMessage.class; + } + } + + SupplierFromConf> messageFactorySupplier; + if (messageFactory != null) { + messageFactorySupplier = + new SupplierFromConfByCopy>(messageFactory); + } else { + messageFactorySupplier = + new DefaultMessageFactorySupplierFromConf<>(messageClass); + } + + SupplierFromConf> + messageCombinerSupplier; + if (messageCombiner != null) { + messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner); + } else { + messageCombinerSupplier = null; + } + + int maxAllowed = + GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal(); + int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal(); + Preconditions.checkState(maxAllowed >= minForce); + + int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ? + MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX : + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal(); + // bound piece type with boundaries: + pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType)); + + MessageEncodeAndStoreType messageEncodeAndStoreType = + MessageEncodeAndStoreType.values()[pieceEncodeType]; + + if (messageFactory instanceof GiraphConfigurationSettable) { + throw new IllegalStateException( + messageFactory.getClass() + " MessageFactory in " + this + + " Piece implements GiraphConfigurationSettable"); + } + if (messageCombiner instanceof GiraphConfigurationSettable) { + throw new IllegalStateException( + messageCombiner.getClass() + " MessageCombiner in " + this + + " Piece implements GiraphConfigurationSettable"); + } + + return new ObjectMessageClasses<>( + messageClass, messageFactorySupplier, + messageCombinerSupplier, messageEncodeAndStoreType); + } + + // Internal implementation + + @Override + public final InnerVertexSender getWrappedVertexSender( + final BlockWorkerSendApi workerApi, S executionStage) { + reducersHandler.vertexSenderWorkerPreprocess(workerApi); + final VertexSender functions = + getVertexSender(workerApi, executionStage); + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex vertex) { + if (functions != null) { + functions.vertexSend(vertex); + } + } + @Override + public void postprocess() { + if (functions instanceof VertexPostprocessor) { + ((VertexPostprocessor) functions).postprocess(); + } + reducersHandler.vertexSenderWorkerPostprocess(workerApi); + } + }; + } + + @Override + public final void wrappedRegisterReducers( + BlockMasterApi masterApi, S executionStage) { + reducersHandler = new ReducersForPieceHandler(); + registerReducers(new CreateReducersApiWrapper( + masterApi, reducersHandler), executionStage); + } + + // utility functions: + // TODO Java8 - move these as default functions to VertexSender interface + protected final void reduceDouble( + ReducerHandle reduceHandle, double value) { + reduceUtils.reduceDouble(reduceHandle, value); + } + + protected final void reduceFloat( + ReducerHandle reduceHandle, float value) { + reduceUtils.reduceFloat(reduceHandle, value); + } + + protected final void reduceLong( + ReducerHandle reduceHandle, long value) { + reduceUtils.reduceLong(reduceHandle, value); + } + + protected final void reduceInt( + ReducerHandle reduceHandle, int value) { + reduceUtils.reduceInt(reduceHandle, value); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java new file mode 100644 index 0000000..3ad66d1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece; + +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Piece that should be extended in common usecases, when we want to be: + * - sending and then receiving messages from vertices + * - sending data to be aggregated from workers to master + * - sending values from master, via aggregators, to workers + * + * (basically - we don't want to use WorkerContext) + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Execution stage type + */ +@SuppressWarnings("rawtypes") +public class Piece + extends DefaultParentPiece { + + // Disallowing use of Worker Context functions: + @Override + public final void workerContextSend( + BlockWorkerContextSendApi workerContextApi, + S executionStage, Object workerValue) { + } + + @Override + public final void workerContextReceive( + BlockWorkerContextReceiveApi workerContextApi, + S executionStage, Object workerValue, List workerMessages) { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java new file mode 100644 index 0000000..a5d0c8c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/PieceWithWorkerContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece; + +import org.apache.giraph.block_app.framework.api.BlockWorkerApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Piece that should be extended when WorkerContext is used. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Worker value type + * @param Worker message type + * @param Execution stage type + */ +@SuppressWarnings("rawtypes") +public class PieceWithWorkerContext + extends DefaultParentPiece { + + /** + * Get global worker value. + * Value returned can be accessed from may threads, and so all + * accesses to it should be done in a thread-safe manner! + * + * This is the only place in Blocks Framework where you need + * to take care of concurrency. + */ + @SuppressWarnings("unchecked") + public WV getWorkerValue(BlockWorkerApi workerApi) { + return (WV) ((BlockWorkerValueAccessor) workerApi).getWorkerValue(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java new file mode 100644 index 0000000..23c2d29 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.delegate; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * Delegate Piece which allows combining multiple pieces in same iteration: + * new DelegatePiece(new LogicPiece(), new StatsPiece()) + * You should be careful when doing so, since those pieces must not interact, + * and only one can send messages. + * Execution of any of the Piece methods by the framework is going to trigger + * sequential execution of that method on all of the pieces this DelegatePiece + * wraps. That means for example, getVertexSender is going to be called on all + * pieces before masterCompute is called on all pieces, which is called before + * getVertexReceiver on all pieces. + * + * Also, via overriding, it provides an abstract class for filtering. I.e. if + * you want piece that filters out calls to masterCompute, you can have: + * new FilterMasterPiece(new LogicPiece()), + * with FilterMasterPiece extends DelegatePiece, and only overrides getMaster + * function and DelegateMasterPiece class. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Worker value type + * @param Worker message type + * @param Execution stage type + */ +@SuppressWarnings("rawtypes") +public class DelegatePiece + extends AbstractPiece { + + private final List> innerPieces; + + @SafeVarargs + @SuppressWarnings("unchecked") + public DelegatePiece(AbstractPiece... innerPieces) { + // Pieces are contravariant, but Java generics cannot express that, + // so use unchecked cast inside to allow callers to be typesafe + this.innerPieces = new ArrayList(Arrays.asList(innerPieces)); + } + + @SuppressWarnings("unchecked") + public DelegatePiece(AbstractPiece innerPiece) { + // Pieces are contravariant, but Java generics cannot express that, + // so use unchecked cast inside to allow callers to be typesafe + this.innerPieces = new ArrayList(Arrays.asList(innerPiece)); + } + + protected DelegateWorkerSendFunctions delegateWorkerSendFunctions( + ArrayList workerSendFunctions, + BlockWorkerSendApi workerApi, S executionStage) { + return new DelegateWorkerSendFunctions(workerSendFunctions); + } + + protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions( + ArrayList> workerReceiveFunctions, + BlockWorkerReceiveApi workerApi, S executionStage) { + return new DelegateWorkerReceiveFunctions(workerReceiveFunctions); + } + + @Override + public InnerVertexSender getWrappedVertexSender( + BlockWorkerSendApi workerApi, S executionStage) { + ArrayList workerSendFunctions = + new ArrayList<>(innerPieces.size()); + for (AbstractPiece innerPiece : innerPieces) { + workerSendFunctions.add( + innerPiece.getWrappedVertexSender(workerApi, executionStage)); + } + return delegateWorkerSendFunctions( + workerSendFunctions, workerApi, executionStage); + } + + @Override + public InnerVertexReceiver getVertexReceiver( + BlockWorkerReceiveApi workerApi, S executionStage) { + ArrayList> workerReceiveFunctions = + new ArrayList<>(innerPieces.size()); + for (AbstractPiece innerPiece : innerPieces) { + workerReceiveFunctions.add( + innerPiece.getVertexReceiver(workerApi, executionStage)); + } + return delegateWorkerReceiveFunctions( + workerReceiveFunctions, workerApi, executionStage); + } + + /** Delegating WorkerSendPiece */ + protected class DelegateWorkerSendFunctions extends InnerVertexSender { + private final ArrayList workerSendFunctions; + + public DelegateWorkerSendFunctions( + ArrayList workerSendFunctions) { + this.workerSendFunctions = workerSendFunctions; + } + + @Override + public void vertexSend(Vertex vertex) { + for (InnerVertexSender functions : workerSendFunctions) { + if (functions != null) { + functions.vertexSend(vertex); + } + } + } + + @Override + public void postprocess() { + for (InnerVertexSender functions : workerSendFunctions) { + if (functions != null) { + functions.postprocess(); + } + } + } + } + + /** Delegating WorkerReceivePiece */ + protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver { + private final ArrayList> + workerReceiveFunctions; + + public DelegateWorkerReceiveFunctions( + ArrayList> workerReceiveFunctions) { + this.workerReceiveFunctions = workerReceiveFunctions; + } + + @Override + public void vertexReceive(Vertex vertex, Iterable messages) { + for (VertexReceiver functions : + workerReceiveFunctions) { + if (functions != null) { + functions.vertexReceive(vertex, messages); + } + } + } + + @Override + public void postprocess() { + for (VertexReceiver functions : + workerReceiveFunctions) { + if (functions instanceof VertexPostprocessor) { + ((VertexPostprocessor) functions).postprocess(); + } + } + } + } + + @Override + public void masterCompute(BlockMasterApi api, S executionStage) { + for (AbstractPiece piece : innerPieces) { + piece.masterCompute(api, executionStage); + } + } + + @Override + public void workerContextSend( + BlockWorkerContextSendApi workerContextApi, S executionStage, + WV workerValue) { + for (AbstractPiece piece : innerPieces) { + piece.workerContextSend(workerContextApi, executionStage, workerValue); + } + } + + @Override + public void workerContextReceive( + BlockWorkerContextReceiveApi workerContextApi, S executionStage, + WV workerValue, List workerMessages) { + for (AbstractPiece piece : innerPieces) { + piece.workerContextReceive( + workerContextApi, executionStage, workerValue, workerMessages); + } + } + + @Override + public S nextExecutionStage(S executionStage) { + for (AbstractPiece innerPiece : innerPieces) { + executionStage = innerPiece.nextExecutionStage(executionStage); + } + return executionStage; + } + + @Override + public MessageClasses getMessageClasses( + ImmutableClassesGiraphConfiguration conf) { + MessageClasses messageClasses = null; + MessageClasses firstMessageClasses = null; + for (AbstractPiece innerPiece : innerPieces) { + MessageClasses cur = innerPiece.getMessageClasses(conf); + Preconditions.checkState(cur != null); + if (!cur.getMessageClass().equals(NoMessage.class)) { + if (messageClasses != null) { + throw new RuntimeException( + "Only one piece combined through delegate (" + + toString() + ") can send messages"); + } + messageClasses = cur; + } + if (firstMessageClasses == null) { + firstMessageClasses = cur; + } + } + return messageClasses != null ? messageClasses : firstMessageClasses; + } + + @Override + public void forAllPossiblePieces(Consumer consumer) { + for (AbstractPiece innerPiece : innerPieces) { + innerPiece.forAllPossiblePieces(consumer); + } + } + + @SuppressWarnings("deprecation") + @Override + public void registerAggregators(BlockMasterApi master) + throws InstantiationException, IllegalAccessException { + for (AbstractPiece innerPiece : innerPieces) { + innerPiece.registerAggregators(master); + } + } + + @Override + public void wrappedRegisterReducers( + BlockMasterApi masterApi, S executionStage) { + for (AbstractPiece innerPiece : innerPieces) { + innerPiece.wrappedRegisterReducers(masterApi, executionStage); + } + } + + protected String delegationName() { + return "Delegate"; + } + + @Override + public String toString() { + return delegationName() + innerPieces.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java new file mode 100644 index 0000000..5c702c5 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/FilteringPiece.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.delegate; + +import java.util.ArrayList; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * Piece which uses a provided suppliers to decide whether or not to run + * receive/send piece part on a certain vertex. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + * @param Message type + * @param Worker value type + * @param Worker message type + * @param Execution stage type + */ +@SuppressWarnings({ "rawtypes" }) +public class FilteringPiece + extends DelegatePiece { + private final SupplierFromVertex toCallSend; + private final SupplierFromVertex toCallReceive; + + /** + * Creates filtering piece which uses passed {@code toCallSend} to filter + * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter + * calls to {@code vertexReceive}, on passed {@code innerPiece}. + */ + @SuppressWarnings("unchecked") + public FilteringPiece( + SupplierFromVertex toCallSend, + SupplierFromVertex + toCallReceive, + AbstractPiece innerPiece) { + super(innerPiece); + // Suppliers are contravariant on vertex types, + // but Java generics cannot express that, + // so use unchecked cast inside to allow callers to be typesafe + this.toCallSend = (SupplierFromVertex) toCallSend; + this.toCallReceive = (SupplierFromVertex) toCallReceive; + Preconditions.checkArgument( + toCallSend != null || toCallReceive != null, + "Both send and receive filter cannot be null"); + } + + /** + * Creates filtering piece, where both vertexSend and vertexReceive is + * filtered based on same supplier. + */ + public FilteringPiece( + SupplierFromVertex + toCallSendAndReceive, + AbstractPiece innerPiece) { + this(toCallSendAndReceive, toCallSendAndReceive, innerPiece); + } + + /** + * Creates filtering piece, that filters only vertexReceive function, + * and always calls vertexSend function. + */ + public static + FilteringPiece createReceiveFiltering( + SupplierFromVertex + toCallReceive, + AbstractPiece innerPiece) { + return new FilteringPiece<>(null, toCallReceive, innerPiece); + } + + /** + * Creates filtering block, that filters only vertexSend function, + * and always calls vertexReceive function. + */ + public static + FilteringPiece createSendFiltering( + SupplierFromVertex toCallSend, + AbstractPiece innerPiece) { + return new FilteringPiece<>(toCallSend, null, innerPiece); + } + + @Override + protected DelegateWorkerSendFunctions delegateWorkerSendFunctions( + ArrayList workerSendFunctions, + BlockWorkerSendApi workerApi, S executionStage) { + return new DelegateWorkerSendFunctions(workerSendFunctions) { + @Override + public void vertexSend(Vertex vertex) { + if (toCallSend == null || toCallSend.get(vertex)) { + super.vertexSend(vertex); + } + } + }; + } + + @Override + protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions( + ArrayList> workerReceiveFunctions, + BlockWorkerReceiveApi workerApi, S executionStage) { + return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) { + @Override + public void vertexReceive(Vertex vertex, Iterable messages) { + if (toCallReceive == null || toCallReceive.get(vertex)) { + super.vertexReceive(vertex, messages); + } + } + }; + } + + @Override + protected String delegationName() { + if (toCallSend != null && toCallReceive != null) { + if (toCallSend != toCallReceive) { + return "AsymFilter"; + } + return "Filter"; + } else if (toCallSend != null) { + return "SendFilter"; + } else if (toCallReceive != null) { + return "ReceiveFilter"; + } else { + throw new IllegalStateException("Both Send and Receive filters are null"); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java new file mode 100644 index 0000000..f367e6c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Pieces that delegate their work to a set of one or multiple other Pieces. + */ +package org.apache.giraph.block_app.framework.piece.delegate; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java new file mode 100644 index 0000000..f18d1f4 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/BroadcastHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm; + +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * Handle to a broadcast. + * + * @param Value type + */ +public interface BroadcastHandle { + /** Get broadcasted value */ + T getBroadcast(WorkerBroadcastUsage worker); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java new file mode 100644 index 0000000..bbec1c6 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReduceUtilsObject.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Utility object with common primitive reduce operations, + * without need to create reusable objects within the piece. + */ +public class ReduceUtilsObject { + private final DoubleWritable reusableDouble = new DoubleWritable(); + private final FloatWritable reusableFloat = new FloatWritable(); + private final LongWritable reusableLong = new LongWritable(); + private final IntWritable reusableInt = new IntWritable(); + + // utility functions: + public void reduceDouble( + ReducerHandle reduceHandle, double value) { + DoubleWritable tmp = reusableDouble; + tmp.set(value); + reduceHandle.reduce(tmp); + } + + public void reduceFloat( + ReducerHandle reduceHandle, float value) { + FloatWritable tmp = reusableFloat; + tmp.set(value); + reduceHandle.reduce(tmp); + } + + public void reduceLong( + ReducerHandle reduceHandle, long value) { + LongWritable tmp = reusableLong; + tmp.set(value); + reduceHandle.reduce(tmp); + } + + public void reduceInt(ReducerHandle reduceHandle, int value) { + IntWritable tmp = reusableInt; + tmp.set(value); + reduceHandle.reduce(tmp); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java new file mode 100644 index 0000000..921c863 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerAndBroadcastWrapperHandle.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * Handle that wraps both reducerHandle and broadcastHandle, so callers + * don't need to have two fields. + * + * @param Single value type + * @param Reduced value type + */ +public class ReducerAndBroadcastWrapperHandle { + private ReducerHandle reducerHandle; + private BroadcastHandle broadcastHandle; + + /** Set reducer handle to just registered handle */ + public void registeredReducer(ReducerHandle reducerHandle) { + this.reducerHandle = reducerHandle; + } + + /** Reduce single value */ + public void reduce(S valueToReduce) { + reducerHandle.reduce(valueToReduce); + } + + /** Get reduced value */ + public R getReducedValue(MasterGlobalCommUsage master) { + return reducerHandle.getReducedValue(master); + } + + /** + * Broadcast reduced value from master + */ + public void broadcastValue(BlockMasterApi master) { + broadcastHandle = reducerHandle.broadcastValue(master); + } + + /** Get broadcasted value */ + public R getBroadcast(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java new file mode 100644 index 0000000..dae40f2 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/ReducerHandle.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.master.MasterGlobalCommUsage; + +/** + * Handle to a reducer. + * + * @param Single value type + * @param Reduced value type + */ +public interface ReducerHandle { + /** Reduce single value */ + void reduce(S valueToReduce); + /** Get reduced value */ + R getReducedValue(MasterGlobalCommUsage master); + + /** + * Broadcast reduced value from master + * + * @return Handle to the broadcasted value. + */ + BroadcastHandle broadcastValue(BlockMasterApi master); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java new file mode 100644 index 0000000..7ee54cb --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ArrayHandle.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.array; + +/** + * Handle to array of handles underneath + * + * @param Value type + */ +public interface ArrayHandle { + /** + * Get value at index. + */ + T get(int index); + + /** + * Size of this array if defined up front, or throws + * UnsupportedOperationException if size is dynamic + */ + int getStaticSize(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java new file mode 100644 index 0000000..bf0d333 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/BroadcastArrayHandle.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.array; + +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * Handle to array of broadcasts + * + * @param Value type + */ +public interface BroadcastArrayHandle + extends ArrayHandle> { + + /** + * Number of elements that were broadcasted. + */ + int getBroadcastedSize(WorkerBroadcastUsage worker); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java new file mode 100644 index 0000000..a4b99ac --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/ReducerArrayHandle.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.array; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; + +/** + * Handle to array of reducers + * + * @param Single value type + * @param Reduced value type + */ +public interface ReducerArrayHandle + extends ArrayHandle> { + + /** + * Number of elements that were reduced. + */ + int getReducedSize(BlockMasterApi master); + + /** + * Broadcast whole array of reducers to master + * + * @return Handle to the broadcasted array. + */ + BroadcastArrayHandle broadcastValue(BlockMasterApi master); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java new file mode 100644 index 0000000..a8beb85 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/array/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Interfaces representing arrays of individual handles. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.array; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java new file mode 100644 index 0000000..c2cc0f2 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/CreateReducersApiWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.internal; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Wrapping masterApi and reducers handler into API for creating reducer + * handles. + */ +public class CreateReducersApiWrapper implements CreateReducersApi { + private final BlockMasterApi masterApi; + private final ReducersForPieceHandler reducersApi; + + public CreateReducersApiWrapper( + BlockMasterApi masterApi, ReducersForPieceHandler reducersApi) { + this.masterApi = masterApi; + this.reducersApi = reducersApi; + } + + @Override + public ReducerHandle createLocalReducer( + ReduceOperation reduceOp) { + return reducersApi.createLocalReducer( + masterApi, reduceOp, reduceOp.createInitialValue()); + } + + @Override + public ReducerHandle createLocalReducer( + ReduceOperation reduceOp, R globalInitialValue) { + return reducersApi.createLocalReducer( + masterApi, reduceOp, globalInitialValue); + } + + @Override + public ReducerHandle createGlobalReducer( + ReduceOperation reduceOp) { + return reducersApi.createGlobalReducer( + masterApi, reduceOp, reduceOp.createInitialValue()); + } + + @Override + public ReducerHandle createGlobalReducer( + ReduceOperation reduceOp, R globalInitialValue) { + return reducersApi.createGlobalReducer( + masterApi, reduceOp, globalInitialValue); + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return masterApi.getConf(); + } +}