Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E1E1611BC5 for ; Wed, 17 Sep 2014 21:04:41 +0000 (UTC) Received: (qmail 89377 invoked by uid 500); 17 Sep 2014 21:04:41 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 89260 invoked by uid 500); 17 Sep 2014 21:04:41 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 89246 invoked by uid 99); 17 Sep 2014 21:04:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Sep 2014 21:04:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6C28DA197F8; Wed, 17 Sep 2014 21:04:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pavanka@apache.org To: commits@giraph.apache.org Date: Wed, 17 Sep 2014 21:04:42 -0000 Message-Id: <8fe0ffe21c9b4b30bfaee8b167af787e@git.apache.org> In-Reply-To: <1d7b86b704a342789178d45810962791@git.apache.org> References: <1d7b86b704a342789178d45810962791@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: updated refs/heads/trunk to 1852057 GIRAPH-938: Allow fast working with primitives generically (ikabiljo via pavanka) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/18520570 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/18520570 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/18520570 Branch: refs/heads/trunk Commit: 185205703ee4cd886598f5393395f19fc367f65f Parents: f6845a3 Author: Pavan Kumar Authored: Wed Sep 17 14:02:40 2014 -0700 Committer: Pavan Kumar Committed: Wed Sep 17 14:02:40 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../messages/InMemoryMessageStoreFactory.java | 34 +- .../primitives/IdByteArrayMessageStore.java | 246 ++++++++ .../primitives/IdOneMessagePerVertexStore.java | 226 +++++++ .../apache/giraph/edge/IdAndNullArrayEdges.java | 184 ++++++ .../giraph/edge/IdAndValueArrayEdges.java | 248 ++++++++ .../apache/giraph/types/ops/BooleanTypeOps.java | 53 ++ .../apache/giraph/types/ops/ByteTypeOps.java | 52 ++ .../apache/giraph/types/ops/DoubleTypeOps.java | 52 ++ .../apache/giraph/types/ops/FloatTypeOps.java | 52 ++ .../org/apache/giraph/types/ops/IntTypeOps.java | 68 ++ .../apache/giraph/types/ops/LongTypeOps.java | 68 ++ .../org/apache/giraph/types/ops/MapTypeOps.java | 47 ++ .../giraph/types/ops/PrimitiveIdTypeOps.java | 55 ++ .../giraph/types/ops/PrimitiveTypeOps.java | 42 ++ .../apache/giraph/types/ops/TextTypeOps.java | 46 ++ .../org/apache/giraph/types/ops/TypeOps.java | 51 ++ .../apache/giraph/types/ops/TypeOpsUtils.java | 149 +++++ .../types/ops/collections/Basic2ObjectMap.java | 322 ++++++++++ .../types/ops/collections/BasicArrayList.java | 632 +++++++++++++++++++ .../giraph/types/ops/collections/BasicSet.java | 206 ++++++ .../ops/collections/ResettableIterator.java | 32 + .../types/ops/collections/WritableWriter.java | 47 ++ .../types/ops/collections/package-info.java | 21 + .../apache/giraph/types/ops/package-info.java | 21 + 25 files changed, 2949 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 34db15c..d9398e7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-938: Allow fast working with primitives generically (ikabiljo via pavanka) + GIRAPH-945: Always use job Configuration to create Configuration (majakabiljo) GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index 02ea7b2..ae86c56 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -20,15 +20,19 @@ package org.apache.giraph.comm.messages; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore; import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore; -import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; +import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -89,8 +93,17 @@ public class InMemoryMessageStoreFactory) conf.createMessageCombiner()); } else { - messageStore = new OneMessagePerVertexStore(messageValueFactory, - service, conf.createMessageCombiner(), conf); + PrimitiveIdTypeOps idTypeOps = + TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass); + if (idTypeOps != null) { + messageStore = new IdOneMessagePerVertexStore<>( + messageValueFactory, service, conf.createMessageCombiner(), + conf); + } else { + messageStore = + new OneMessagePerVertexStore(messageValueFactory, service, + conf.createMessageCombiner(), conf); + } } return messageStore; } @@ -127,11 +140,18 @@ public class InMemoryMessageStoreFactory( - messageValueFactory, service, conf); + PrimitiveIdTypeOps idTypeOps = + TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass); + if (idTypeOps != null) { + messageStore = new IdByteArrayMessageStore<>( + messageValueFactory, service, conf); + } else { + messageStore = new ByteArrayMessagesPerVertexStore<>( + messageValueFactory, service, conf); + } } else if (encodeAndStore.equals( MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { - messageStore = new PointerListPerVertexStore(messageValueFactory, + messageStore = new PointerListPerVertexStore<>(messageValueFactory, service, conf); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java new file mode 100644 index 0000000..efe6199 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.comm.messages.primitives; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.MessagesIterable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.WritableWriter; +import org.apache.giraph.utils.EmptyIterable; +import org.apache.giraph.utils.VerboseByteStructMessageWrite; +import org.apache.giraph.utils.VertexIdMessageBytesIterator; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.giraph.utils.io.DataInputOutput; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.Lists; + +/** + * Special message store to be used when IDs are primitive and no combiner is + * used. + * Data is backed by primitive maps in order to decrease number of objects and + * get better performance. + * + * @param Vertex id type + * @param Message type + */ +public class IdByteArrayMessageStore implements MessageStore { + /** Message value factory */ + protected final MessageValueFactory messageValueFactory; + /** Map from partition id to map from vertex id to message */ + private final Int2ObjectOpenHashMap> map; + /** Service worker */ + private final CentralizedServiceWorker service; + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration config; + /** Vertex id TypeOps */ + private final PrimitiveIdTypeOps idTypeOps; + /** WritableWriter for values in this message store */ + private final WritableWriter + dataInputOutputWriter = new WritableWriter() { + @Override + public DataInputOutput readFields(DataInput in) throws IOException { + DataInputOutput dataInputOutput = config.createMessagesInputOutput(); + dataInputOutput.readFields(in); + return dataInputOutput; + } + + @Override + public void write(DataOutput out, DataInputOutput value) + throws IOException { + value.write(out); + } + }; + + /** + * Constructor + * + * @param messageValueFactory Factory for creating message values + * @param service Service worker + * @param config Hadoop configuration + */ + public IdByteArrayMessageStore(MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration config) { + this.messageValueFactory = messageValueFactory; + this.service = service; + this.config = config; + + idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass()); + + map = new Int2ObjectOpenHashMap>(); + for (int partitionId : service.getPartitionStore().getPartitionIds()) { + Partition partition = + service.getPartitionStore().getOrCreatePartition(partitionId); + Basic2ObjectMap partitionMap = + idTypeOps.create2ObjectOpenHashMap( + Math.max(10, (int) partition.getVertexCount())); + + map.put(partitionId, partitionMap); + service.getPartitionStore().putPartition((Partition) partition); + } + } + + /** + * Get map which holds messages for partition which vertex belongs to. + * + * @param vertexId Id of the vertex + * @return Map which holds messages for partition which vertex belongs to. + */ + private Basic2ObjectMap getPartitionMap(I vertexId) { + return map.get(service.getPartitionId(vertexId)); + } + + /** + * Get the DataInputOutput for a vertex id, creating if necessary. + * + * @param partitionMap Partition map to look in + * @param vertexId Id of the vertex + * @return DataInputOutput for this vertex id (created if necessary) + */ + private DataInputOutput getDataInputOutput( + Basic2ObjectMap partitionMap, + I vertexId) { + DataInputOutput dataInputOutput = partitionMap.get(vertexId); + if (dataInputOutput == null) { + dataInputOutput = config.createMessagesInputOutput(); + partitionMap.put(vertexId, dataInputOutput); + } + return dataInputOutput; + } + + @Override + public void addPartitionMessages(int partitionId, + VertexIdMessages messages) throws IOException { + Basic2ObjectMap partitionMap = map.get(partitionId); + synchronized (partitionMap) { + VertexIdMessageBytesIterator vertexIdMessageBytesIterator = + messages.getVertexIdMessageBytesIterator(); + // Try to copy the message buffer over rather than + // doing a deserialization of a message just to know its size. This + // should be more efficient for complex objects where serialization is + // expensive. If this type of iterator is not available, fall back to + // deserializing/serializing the messages + if (vertexIdMessageBytesIterator != null) { + while (vertexIdMessageBytesIterator.hasNext()) { + vertexIdMessageBytesIterator.next(); + DataInputOutput dataInputOutput = getDataInputOutput( + partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId()); + vertexIdMessageBytesIterator.writeCurrentMessageBytes( + dataInputOutput.getDataOutput()); + } + } else { + VertexIdMessageIterator iterator = + messages.getVertexIdMessageIterator(); + while (iterator.hasNext()) { + iterator.next(); + DataInputOutput dataInputOutput = + getDataInputOutput(partitionMap, iterator.getCurrentVertexId()); + + VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, + dataInputOutput.getDataOutput()); + } + } + } + } + + @Override + public void clearPartition(int partitionId) throws IOException { + map.get(partitionId).clear(); + } + + @Override + public boolean hasMessagesForVertex(I vertexId) { + return getPartitionMap(vertexId).containsKey(vertexId); + } + + @Override + public Iterable getVertexMessages(I vertexId) throws IOException { + DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId); + if (dataInputOutput == null) { + return EmptyIterable.get(); + } else { + return new MessagesIterable(dataInputOutput, messageValueFactory); + } + } + + @Override + public void clearVertexMessages(I vertexId) throws IOException { + getPartitionMap(vertexId).remove(vertexId); + } + + @Override + public void clearAll() throws IOException { + map.clear(); + } + + @Override + public Iterable getPartitionDestinationVertices(int partitionId) { + Basic2ObjectMap partitionMap = map.get(partitionId); + List vertices = Lists.newArrayListWithCapacity(partitionMap.size()); + Iterator iterator = partitionMap.fastKeyIterator(); + while (iterator.hasNext()) { + vertices.add(idTypeOps.createCopy(iterator.next())); + } + return vertices; + } + + @Override + public void writePartition(DataOutput out, int partitionId) + throws IOException { + Basic2ObjectMap partitionMap = map.get(partitionId); + partitionMap.write(out, dataInputOutputWriter); + } + + @Override + public void readFieldsForPartition(DataInput in, int partitionId) + throws IOException { + Basic2ObjectMap partitionMap = + idTypeOps.create2ObjectOpenHashMap(10); + partitionMap.readFields(in, dataInputOutputWriter); + synchronized (map) { + map.put(partitionId, partitionMap); + } + } + + @Override + public void finalizeStore() { + } + + @Override + public boolean isPointerListEncoding() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java new file mode 100644 index 0000000..c72bedf --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.comm.messages.primitives; + +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.WritableWriter; +import org.apache.giraph.utils.EmptyIterable; +import org.apache.giraph.utils.VertexIdMessageIterator; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.Lists; + +/** + * Special message store to be used when IDs are primitive and message doesn't + * need to be, and message combiner is used. + * Data is backed by primitive keyed maps in order to decrease number of + * objects and get better performance. + * (keys are using primitives, values are using objects, even if they + * are primitive) + * + * @param Vertex id type + * @param Message type + */ +public class IdOneMessagePerVertexStore implements MessageStore { + /** Map from partition id to map from vertex id to message */ + private final Int2ObjectOpenHashMap> map; + /** Message value factory */ + private final MessageValueFactory messageValueFactory; + /** Message messageCombiner */ + private final MessageCombiner messageCombiner; + /** Service worker */ + private final CentralizedServiceWorker service; + /** Giraph configuration */ + private final ImmutableClassesGiraphConfiguration config; + /** Vertex id TypeOps */ + private final PrimitiveIdTypeOps idTypeOps; + /** WritableWriter for values in this message store */ + private final WritableWriter messageWriter = new WritableWriter() { + @Override + public M readFields(DataInput in) throws IOException { + M message = messageValueFactory.newInstance(); + message.readFields(in); + return message; + } + + @Override + public void write(DataOutput out, M value) throws IOException { + value.write(out); + } + }; + + /** + * Constructor + * + * @param messageValueFactory Message value factory + * @param service Service worker + * @param messageCombiner Message messageCombiner + * @param config Config + */ + public IdOneMessagePerVertexStore( + MessageValueFactory messageValueFactory, + CentralizedServiceWorker service, + MessageCombiner messageCombiner, + ImmutableClassesGiraphConfiguration config) { + this.service = service; + this.config = config; + this.messageValueFactory = messageValueFactory; + this.messageCombiner = messageCombiner; + + idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass()); + + map = new Int2ObjectOpenHashMap<>(); + for (int partitionId : service.getPartitionStore().getPartitionIds()) { + Partition partition = + service.getPartitionStore().getOrCreatePartition(partitionId); + Basic2ObjectMap partitionMap = idTypeOps.create2ObjectOpenHashMap( + (int) partition.getVertexCount()); + map.put(partitionId, partitionMap); + service.getPartitionStore().putPartition((Partition) partition); + } + } + + /** + * Get map which holds messages for partition which vertex belongs to. + * + * @param vertexId Id of the vertex + * @return Map which holds messages for partition which vertex belongs to. + */ + private Basic2ObjectMap getPartitionMap(I vertexId) { + return map.get(service.getPartitionId(vertexId)); + } + + @Override + public void addPartitionMessages( + int partitionId, + VertexIdMessages messages) throws IOException { + Basic2ObjectMap partitionMap = map.get(partitionId); + synchronized (partitionMap) { + VertexIdMessageIterator + iterator = messages.getVertexIdMessageIterator(); + // This loop is a little complicated as it is optimized to only create + // the minimal amount of vertex id and message objects as possible. + while (iterator.hasNext()) { + iterator.next(); + I vertexId = iterator.getCurrentVertexId(); + M currentMessage = + partitionMap.get(iterator.getCurrentVertexId()); + if (currentMessage == null) { + M newMessage = messageCombiner.createInitialMessage(); + currentMessage = partitionMap.put( + iterator.getCurrentVertexId(), newMessage); + if (currentMessage == null) { + currentMessage = newMessage; + } + } + messageCombiner.combine(vertexId, currentMessage, + iterator.getCurrentMessage()); + } + } + } + + @Override + public void clearPartition(int partitionId) throws IOException { + map.get(partitionId).clear(); + } + + @Override + public boolean hasMessagesForVertex(I vertexId) { + return getPartitionMap(vertexId).containsKey(vertexId); + } + + @Override + public Iterable getVertexMessages( + I vertexId) throws IOException { + Basic2ObjectMap partitionMap = getPartitionMap(vertexId); + if (!partitionMap.containsKey(vertexId)) { + return EmptyIterable.get(); + } else { + return Collections.singleton(partitionMap.get(vertexId)); + } + } + + @Override + public void clearVertexMessages(I vertexId) throws IOException { + getPartitionMap(vertexId).remove(vertexId); + } + + @Override + public void clearAll() throws IOException { + map.clear(); + } + + @Override + public Iterable getPartitionDestinationVertices( + int partitionId) { + Basic2ObjectMap partitionMap = map.get(partitionId); + List vertices = + Lists.newArrayListWithCapacity(partitionMap.size()); + Iterator iterator = partitionMap.fastKeyIterator(); + while (iterator.hasNext()) { + vertices.add(idTypeOps.createCopy(iterator.next())); + } + return vertices; + } + + @Override + public void writePartition(DataOutput out, + int partitionId) throws IOException { + Basic2ObjectMap partitionMap = map.get(partitionId); + partitionMap.write(out, messageWriter); + } + + @Override + public void readFieldsForPartition(DataInput in, + int partitionId) throws IOException { + Basic2ObjectMap partitionMap = idTypeOps.create2ObjectOpenHashMap(10); + partitionMap.readFields(in, messageWriter); + synchronized (map) { + map.put(partitionId, partitionMap); + } + } + + @Override + public void finalizeStore() { + } + + @Override + public boolean isPointerListEncoding() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java new file mode 100644 index 0000000..7de5d2a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndNullArrayEdges.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.edge; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.EdgeIterables; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Implementation of {@link OutEdges} with IDs and null edge values having + * their TypeOps. + * Backed by a dynamic primitive array. Parallel edges are allowed. + * Note: this implementation is optimized for space + * usage, but random access and edge removals are expensive. + * + * @param Vertex id type + */ +public class IdAndNullArrayEdges + implements ReuseObjectsOutEdges, + MutableOutEdges, + ImmutableClassesGiraphConfigurable { + + /** Array of target vertex ids. */ + private BasicArrayList neighbors; + + @Override + public + ImmutableClassesGiraphConfiguration getConf() { + throw new UnsupportedOperationException(); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration conf) { + PrimitiveIdTypeOps idTypeOps = + TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass()); + neighbors = idTypeOps.createArrayList(10); + if (!conf.getEdgeValueClass().equals(NullWritable.class)) { + throw new IllegalArgumentException( + "IdAndNullArrayEdges can be used only with NullWritable " + + "as edgeValueClass, not with " + conf.getEdgeValueClass()); + } + } + + @Override + public void initialize(Iterable> edges) { + EdgeIterables.initialize(this, edges); + } + + @Override + public void initialize(int capacity) { + neighbors.setCapacity(capacity); + } + + @Override + public void initialize() { + initialize(10); + } + + @Override + public void add(Edge edge) { + neighbors.add(edge.getTargetVertexId()); + } + + /** + * If the backing array is more than four times as big as the number of + * elements, reduce to 2 times current size. + */ + private void trim() { + if (neighbors.capacity() > 4 * neighbors.size()) { + neighbors.setCapacity(neighbors.size() * 2); + } + } + + /** + * Remove edge at position i. + * + * @param i Position of edge to be removed + */ + private void removeAt(int i) { + // The order of the edges is irrelevant, so we can simply replace + // the deleted edge with the rightmost element, thus achieving constant + // time. + I tmpValue = neighbors.getElementTypeOps().create(); + neighbors.popInto(tmpValue); + if (i != neighbors.size()) { + neighbors.set(i, tmpValue); + } + // If needed after the removal, trim the array. + trim(); + } + + @Override + public void remove(I targetVertexId) { + // Thanks to the constant-time implementation of removeAt(int), + // we can remove all matching edges in linear time. + I tmpValue = neighbors.getElementTypeOps().create(); + for (int i = neighbors.size() - 1; i >= 0; --i) { + neighbors.getInto(i, tmpValue); + if (tmpValue.equals(targetVertexId)) { + removeAt(i); + } + } + } + + @Override + public int size() { + return neighbors.size(); + } + + @Override + public Iterator> iterator() { + // Returns an iterator that reuses objects. + // The downcast is fine because all concrete Edge implementations are + // mutable, but we only expose the mutation functionality when appropriate. + return (Iterator) mutableIterator(); + } + + @Override + public Iterator> mutableIterator() { + return new Iterator>() { + /** Current position in the array. */ + private int offset = 0; + /** Representative edge object. */ + private final MutableEdge representativeEdge = + EdgeFactory.createReusable(neighbors.getElementTypeOps().create()); + + @Override + public boolean hasNext() { + return offset < neighbors.size(); + } + + @Override + public MutableEdge next() { + neighbors.getInto(offset++, representativeEdge.getTargetVertexId()); + return representativeEdge; + } + + @Override + public void remove() { + // Since removeAt() might replace the deleted edge with the last edge + // in the array, we need to decrease the offset so that the latter + // won't be skipped. + removeAt(--offset); + } + }; + } + + @Override + public void write(DataOutput out) throws IOException { + neighbors.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + neighbors.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java new file mode 100644 index 0000000..b99692f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/IdAndValueArrayEdges.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.edge; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.EdgeIterables; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.UnmodifiableIterator; + +/** + * Implementation of {@link OutEdges} with IDs and Edge values having their + * TypeOps. + * Data is backed by a dynamic primitive array. Parallel edges are allowed. + * Note: this implementation is optimized for space usage, but random access + * and edge removals are expensive. + * + * @param Vertex id type + * @param Edge value type + */ +public class IdAndValueArrayEdges implements ReuseObjectsOutEdges, + MutableOutEdges, + ImmutableClassesGiraphConfigurable { + + /** Array of target vertex ids. */ + private BasicArrayList neighborIds; + /** Array of edge values. */ + private BasicArrayList neighborEdgeValues; + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + throw new UnsupportedOperationException(); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration conf) { + PrimitiveIdTypeOps idTypeOps = + TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass()); + neighborIds = idTypeOps.createArrayList(10); + + PrimitiveTypeOps edgeTypeOps = + TypeOpsUtils.getPrimitiveTypeOps(conf.getEdgeValueClass()); + neighborEdgeValues = edgeTypeOps.createArrayList(10); + } + + @Override + public void initialize(Iterable> edges) { + EdgeIterables.initialize(this, edges); + } + + @Override + public void initialize(int capacity) { + neighborIds.setCapacity(capacity); + neighborEdgeValues.setCapacity(capacity); + } + + @Override + public void initialize() { + initialize(10); + } + + @Override + public void add(Edge edge) { + neighborIds.add(edge.getTargetVertexId()); + neighborEdgeValues.add(edge.getValue()); + } + + /** + * If the backing array is more than four times as big as the number of + * elements, reduce to 2 times current size. + */ + private void trim() { + if (neighborIds.capacity() > 4 * neighborIds.size()) { + neighborIds.setCapacity(neighborIds.size() * 2); + neighborEdgeValues.setCapacity(neighborIds.size() * 2); + } + } + + /** + * Remove edge at position i. + * + * @param i Position of edge to be removed + */ + private void removeAt(int i) { + // The order of the edges is irrelevant, so we can simply replace + // the deleted edge with the rightmost element, thus achieving constant + // time. + I tmpId = neighborIds.getElementTypeOps().create(); + E tmpValue = neighborEdgeValues.getElementTypeOps().create(); + + neighborIds.popInto(tmpId); + neighborEdgeValues.popInto(tmpValue); + if (i != neighborIds.size()) { + neighborIds.set(i, tmpId); + neighborEdgeValues.set(i, tmpValue); + } + // If needed after the removal, trim the array. + trim(); + } + + @Override + public void remove(I targetVertexId) { + // Thanks to the constant-time implementation of removeAt(int), + // we can remove all matching edges in linear time. + I tmpId = neighborIds.getElementTypeOps().create(); + for (int i = neighborIds.size() - 1; i >= 0; --i) { + neighborIds.getInto(i, tmpId); + if (tmpId.equals(targetVertexId)) { + removeAt(i); + } + } + } + + @Override + public int size() { + return neighborIds.size(); + } + + @Override + public Iterator> iterator() { + // Returns an iterator that reuses objects. + return new UnmodifiableIterator>() { + private int index; + + /** Representative edge object. */ + private final Edge representativeEdge = EdgeFactory.create( + neighborIds.getElementTypeOps().create(), + neighborEdgeValues.getElementTypeOps().create()); + + @Override + public boolean hasNext() { + return index < neighborIds.size(); + } + + @Override + public Edge next() { + neighborIds.getInto(index, representativeEdge.getTargetVertexId()); + neighborEdgeValues.getInto(index, representativeEdge.getValue()); + index++; + return representativeEdge; + } + }; + } + + /** Helper class for a mutable edge that modifies the backing arrays. */ + private class ArrayMutableEdge extends DefaultEdge { + /** Index of the edge in the backing arrays. */ + private int index; + + /** Constructor. */ + public ArrayMutableEdge() { + super( + neighborIds.getElementTypeOps().create(), + neighborEdgeValues.getElementTypeOps().create()); + } + + /** + * Make the edge point to the given index in the backing arrays. + * + * @param index Index in the arrays + */ + public void setIndex(int index) { + // Update the id and value objects from the superclass. + neighborIds.getInto(index, getTargetVertexId()); + neighborEdgeValues.getInto(index, getValue()); + // Update the index. + this.index = index; + } + + @Override + public void setValue(E value) { + // Update the value object from the superclass. + neighborEdgeValues.getElementTypeOps().set(getValue(), value); + // Update the value stored in the backing array. + neighborEdgeValues.set(index, value); + } + } + + @Override + public Iterator> mutableIterator() { + return new Iterator>() { + /** Current position in the array. */ + private int index = 0; + /** Representative edge object. */ + private final ArrayMutableEdge representativeEdge = + new ArrayMutableEdge(); + + @Override + public boolean hasNext() { + return index < neighborIds.size(); + } + + @Override + public MutableEdge next() { + representativeEdge.setIndex(index++); + return representativeEdge; + } + + @Override + public void remove() { + // Since removeAt() might replace the deleted edge with the last edge + // in the array, we need to decrease the offset so that the latter + // won't be skipped. + removeAt(--index); + } + }; + } + + @Override + public void write(DataOutput out) throws IOException { + neighborIds.write(out); + neighborEdgeValues.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + neighborIds.readFields(in); + neighborEdgeValues.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java new file mode 100644 index 0000000..a65fa3b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/BooleanTypeOps.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicBooleanArrayList; +import org.apache.hadoop.io.BooleanWritable; + + +/** TypeOps implementation for working with BooleanWritable type */ +public enum BooleanTypeOps implements PrimitiveTypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return BooleanWritable.class; + } + + @Override + public BooleanWritable create() { + return new BooleanWritable(); + } + + @Override + public BooleanWritable createCopy(BooleanWritable from) { + return new BooleanWritable(from.get()); + } + + @Override + public void set(BooleanWritable to, BooleanWritable from) { + to.set(from.get()); + } + + @Override + public BasicBooleanArrayList createArrayList(int capacity) { + return new BasicBooleanArrayList(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java new file mode 100644 index 0000000..2b27ba5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicByteArrayList; +import org.apache.hadoop.io.ByteWritable; + +/** TypeOps implementation for working with ByteWritable type */ +public enum ByteTypeOps implements PrimitiveTypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return ByteWritable.class; + } + + @Override + public ByteWritable create() { + return new ByteWritable(); + } + + @Override + public ByteWritable createCopy(ByteWritable from) { + return new ByteWritable(from.get()); + } + + @Override + public void set(ByteWritable to, ByteWritable from) { + to.set(from.get()); + } + + @Override + public BasicByteArrayList createArrayList(int capacity) { + return new BasicByteArrayList(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java new file mode 100644 index 0000000..af8c38f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicDoubleArrayList; +import org.apache.hadoop.io.DoubleWritable; + +/** TypeOps implementation for working with DoubleWritable type */ +public enum DoubleTypeOps implements PrimitiveTypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return DoubleWritable.class; + } + + @Override + public DoubleWritable create() { + return new DoubleWritable(); + } + + @Override + public DoubleWritable createCopy(DoubleWritable from) { + return new DoubleWritable(from.get()); + } + + @Override + public void set(DoubleWritable to, DoubleWritable from) { + to.set(from.get()); + } + + @Override + public BasicDoubleArrayList createArrayList(int capacity) { + return new BasicDoubleArrayList(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java new file mode 100644 index 0000000..3ca8409 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicFloatArrayList; +import org.apache.hadoop.io.FloatWritable; + +/** TypeOps implementation for working with FloatWritable type */ +public enum FloatTypeOps implements PrimitiveTypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return FloatWritable.class; + } + + @Override + public FloatWritable create() { + return new FloatWritable(); + } + + @Override + public FloatWritable createCopy(FloatWritable from) { + return new FloatWritable(from.get()); + } + + @Override + public void set(FloatWritable to, FloatWritable from) { + to.set(from.get()); + } + + @Override + public BasicFloatArrayList createArrayList(int capacity) { + return new BasicFloatArrayList(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java new file mode 100644 index 0000000..f9a32c0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicInt2ObjectOpenHashMap; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicIntArrayList; +import org.apache.giraph.types.ops.collections.BasicSet; +import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet; +import org.apache.hadoop.io.IntWritable; + +/** TypeOps implementation for working with IntWritable type */ +public enum IntTypeOps implements PrimitiveIdTypeOps { + /** Singleton instance */ + INSTANCE; + + @Override + public Class getTypeClass() { + return IntWritable.class; + } + + @Override + public IntWritable create() { + return new IntWritable(); + } + + @Override + public IntWritable createCopy(IntWritable from) { + return new IntWritable(from.get()); + } + + @Override + public void set(IntWritable to, IntWritable from) { + to.set(from.get()); + } + + @Override + public BasicSet createOpenHashSet(int capacity) { + return new BasicIntOpenHashSet(capacity); + } + + @Override + public BasicArrayList createArrayList(int capacity) { + return new BasicIntArrayList(capacity); + } + + @Override + public Basic2ObjectMap create2ObjectOpenHashMap( + int capacity) { + return new BasicInt2ObjectOpenHashMap<>(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java new file mode 100644 index 0000000..4e5ca54 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicLong2ObjectOpenHashMap; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList; +import org.apache.giraph.types.ops.collections.BasicSet; +import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet; +import org.apache.hadoop.io.LongWritable; + +/** TypeOps implementation for working with LongWritable type */ +public enum LongTypeOps implements PrimitiveIdTypeOps { + /** Singleton instance */ + INSTANCE; + + @Override + public Class getTypeClass() { + return LongWritable.class; + } + + @Override + public LongWritable create() { + return new LongWritable(); + } + + @Override + public LongWritable createCopy(LongWritable from) { + return new LongWritable(from.get()); + } + + @Override + public void set(LongWritable to, LongWritable from) { + to.set(from.get()); + } + + @Override + public BasicSet createOpenHashSet(int capacity) { + return new BasicLongOpenHashSet(capacity); + } + + @Override + public BasicArrayList createArrayList(int capacity) { + return new BasicLongArrayList(capacity); + } + + @Override + public Basic2ObjectMap create2ObjectOpenHashMap( + int capacity) { + return new BasicLong2ObjectOpenHashMap<>(capacity); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java new file mode 100644 index 0000000..cd9f079 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/MapTypeOps.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.hadoop.io.MapWritable; + +/** TypeOps implementation for working with MapWritable type */ +public enum MapTypeOps implements TypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return MapWritable.class; + } + + @Override + public MapWritable create() { + return new MapWritable(); + } + + @Override + public MapWritable createCopy(MapWritable from) { + return new MapWritable(from); + } + + @Override + public void set(MapWritable to, MapWritable from) { + to.clear(); + to.putAll(from); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java new file mode 100644 index 0000000..29b0c6e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicSet; + + + +/** + * Additional type operations to TypeOps for types that can be IDs, + * and so can be used as keys in maps and values in sets. + * + * Using any of the provided operations should lead to no boxing/unboxing. + * + * Useful generic wrappers to fastutil libraries are provided, + * so that you can look at them generically. + * + * @param Type + */ +public interface PrimitiveIdTypeOps extends PrimitiveTypeOps { + // primitive collections + + /** + * Create BasicSet of type T, given capacity. + * @param capacity Capacity + * @return BasicSet + */ + BasicSet createOpenHashSet(int capacity); + + /** + * Create Basic2ObjectMap with key type T, given capacity. + * Values are represented as object, even if they can be primitive. + * + * @param capacity Capacity + * @param Type of values in the map + * @return Basic2ObjectMap + */ + Basic2ObjectMap create2ObjectOpenHashMap(int capacity); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java new file mode 100644 index 0000000..72b684f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveTypeOps.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.giraph.types.ops.collections.BasicArrayList; + + +/** + * Type operations, allowing working generically with types, + * but still having efficient code. + * + * Using any of the provided operations should lead to no boxing/unboxing. + * + * Useful generic wrappers to fastutil libraries are provided, + * so that you can look at them generically. + * + * @param Type + */ +public interface PrimitiveTypeOps extends TypeOps { + // primitive collections + /** + * Create BasicArrayList of type T, given capacity. + * @param capacity Capacity + * @return BasicArrayList + */ + BasicArrayList createArrayList(int capacity); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java new file mode 100644 index 0000000..c785cd9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TextTypeOps.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.hadoop.io.Text; + +/** TypeOps implementation for working with Text type */ +public enum TextTypeOps implements TypeOps { + /** Singleton instance */ + INSTANCE(); + + @Override + public Class getTypeClass() { + return Text.class; + } + + @Override + public Text create() { + return new Text(); + } + + @Override + public Text createCopy(Text from) { + return new Text(from); + } + + @Override + public void set(Text to, Text from) { + to.set(from.getBytes()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java new file mode 100644 index 0000000..b7f9479 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + + +/** + * Type operations, allowing working generically with mutable types, + * but still having efficient code. + * For example, by reducing object allocation via reuse. + * + * @param Type + */ +public interface TypeOps { + /** + * Class object for generic type T. + * @return Class object + */ + Class getTypeClass(); + /** + * Create new instance of type T. + * @return new instance + */ + T create(); + /** + * Create a copy of passed object + * @param from Object to copy + * @return Copy + */ + T createCopy(T from); + /** + * Copies value from first argument into the second. + * @param to Value of object to be copied + * @param from Object into which value should be copied + */ + void set(T to, T from); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java new file mode 100644 index 0000000..df5f2bd --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; + +/** + * Utility functions for getting TypeOps instances from class types. + */ +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TypeOpsUtils { + /** No instances */ + private TypeOpsUtils() { } + + /** + * Get PrimitiveIdTypeOps for given type, or null if there is none. + * @param type Class type + * @param Type + * @return PrimitiveIdTypeOps + */ + public static + PrimitiveIdTypeOps getPrimitiveIdTypeOpsOrNull(Class type) { + if (type.equals(LongWritable.class)) { + return (PrimitiveIdTypeOps) LongTypeOps.INSTANCE; + } else if (type.equals(IntWritable.class)) { + return (PrimitiveIdTypeOps) IntTypeOps.INSTANCE; + } else { + return null; + } + } + + /** + * Get PrimitiveIdTypeOps for given type. + * Exception will be thrown if there is none. + * @param type Class type + * @param Type + * @return PrimitiveIdTypeOps + */ + public static + PrimitiveIdTypeOps getPrimitiveIdTypeOps(Class type) { + PrimitiveIdTypeOps typeOps = getPrimitiveIdTypeOpsOrNull(type); + if (typeOps != null) { + return typeOps; + } else { + throw new IllegalArgumentException( + type + " not supported in PrimitiveIdTypeOps"); + } + } + + /** + * Get PrimitiveTypeOps for given type, or null if there is none. + * @param type Class type + * @param Type + * @return PrimitiveTypeOps + */ + public static + PrimitiveTypeOps getPrimitiveTypeOpsOrNull(Class type) { + PrimitiveTypeOps typeOps = getPrimitiveIdTypeOpsOrNull(type); + if (typeOps != null) { + return typeOps; + } else if (type.equals(FloatWritable.class)) { + return (PrimitiveTypeOps) FloatTypeOps.INSTANCE; + } else if (type.equals(DoubleWritable.class)) { + return (PrimitiveTypeOps) DoubleTypeOps.INSTANCE; + } else if (type.equals(BooleanWritable.class)) { + return (PrimitiveTypeOps) BooleanTypeOps.INSTANCE; + } else if (type.equals(ByteWritable.class)) { + return (PrimitiveTypeOps) ByteTypeOps.INSTANCE; + } else { + return null; + } + } + + /** + * Get PrimitiveTypeOps for given type. + * Exception will be thrown if there is none. + * @param type Class type + * @param Type + * @return PrimitiveTypeOps + */ + public static + PrimitiveTypeOps getPrimitiveTypeOps(Class type) { + PrimitiveTypeOps typeOps = getPrimitiveTypeOpsOrNull(type); + if (typeOps != null) { + return typeOps; + } else { + throw new IllegalArgumentException( + type + " not supported in PrimitiveTypeOps"); + } + } + + /** + * Get TypeOps for given type, or null if there is none. + * @param type Class type + * @param Type + * @return TypeOps + */ + public static TypeOps getTypeOpsOrNull(Class type) { + TypeOps typeOps = getPrimitiveTypeOpsOrNull(type); + if (typeOps != null) { + return typeOps; + } else if (type.equals(Text.class)) { + return (TypeOps) TextTypeOps.INSTANCE; + } else if (type.equals(MapWritable.class)) { + return (TypeOps) MapTypeOps.INSTANCE; + } else { + return null; + } + } + + /** + * Get TypeOps for given type. + * Exception will be thrown if there is none. + * @param type Class type + * @param Type + * @return TypeOps + */ + public static TypeOps getTypeOps(Class type) { + TypeOps typeOps = getTypeOpsOrNull(type); + if (typeOps != null) { + return typeOps; + } else { + throw new IllegalArgumentException( + type + " not supported in TypeOps"); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/18520570/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java new file mode 100644 index 0000000..f7ef570 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops.collections; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * Basic2ObjectMap with only basic set of operations. + * All operations that return object T are returning reusable object, + * which is modified after calling any other function. + * + * @param Key type + * @param Value type + */ +public abstract class Basic2ObjectMap { + /** Removes all of the elements from this list. */ + public abstract void clear(); + /** + * Number of elements in this list + * @return size + */ + public abstract int size(); + + /** + * Checks whether key is present in the map + * @param key Key + * @return true if present + */ + public abstract boolean containsKey(K key); + /** + * Adds a pair to the map. + * + * @param key Key + * @param value Value. + * @return the old value, or null if no value was present for the given key. + */ + public abstract V put(K key, V value); + /** + * Get value for a given key + * @param key Key + * @return Value, or null + */ + public abstract V get(K key); + /** + * Removes the mapping with the given key. + * + * @param key Key + * @return the old value, or null if no value was present for the given key. + */ + public abstract V remove(K key); + + /** + * TypeOps for type of keys this object holds + * @return TypeOps + */ + public abstract PrimitiveIdTypeOps getKeyTypeOps(); + + /** + * Fast iterator over keys within this map, which doesn't allocate new + * element for each returned element. + * + * Object returned by next() is only valid until next() is called again, + * because it is reused. + * + * @return Iterator + */ + public abstract Iterator fastKeyIterator(); + + /** + * Serializes the object, given a writer for values. + * @param out DataOuput to serialize object into. + * @param writer Writer of values + * @throws IOException + */ + public abstract void write(DataOutput out, WritableWriter writer) + throws IOException; + /** + * Deserialize the object, given a writer for values. + * @param in DataInput to deseriablize object from. + * @param writer Writer of values + * @throws IOException + */ + public abstract void readFields(DataInput in, WritableWriter writer) + throws IOException; + + /** + * Iterator that reuses key object. + * + * @param Primitive key iterator type + */ + protected abstract class ReusableIterator> + implements Iterator { + /** Primitive Key iterator */ + protected final Iter iter; + /** Reusable key object */ + protected final K reusableKey = getKeyTypeOps().create(); + + /** + * Constructor + * @param iter Primitive Key iterator + */ + public ReusableIterator(Iter iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public void remove() { + iter.remove(); + } + } + + /** IntWritable implementation of Basic2ObjectMap */ + public static final class BasicInt2ObjectOpenHashMap + extends Basic2ObjectMap { + /** Map */ + private final Int2ObjectOpenHashMap map; + + /** + * Constructor + * @param capacity Capacity + */ + public BasicInt2ObjectOpenHashMap(int capacity) { + this.map = new Int2ObjectOpenHashMap<>(capacity); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean containsKey(IntWritable key) { + return map.containsKey(key.get()); + } + + @Override + public V put(IntWritable key, V value) { + return map.put(key.get(), value); + } + + @Override + public V get(IntWritable key) { + return map.get(key.get()); + } + + @Override + public V remove(IntWritable key) { + return map.remove(key.get()); + } + + @Override + public PrimitiveIdTypeOps getKeyTypeOps() { + return IntTypeOps.INSTANCE; + } + + @Override + public Iterator fastKeyIterator() { + return new ReusableIterator(map.keySet().iterator()) { + @Override + public IntWritable next() { + reusableKey.set(iter.nextInt()); + return reusableKey; + } + }; + } + + @Override + public void write(DataOutput out, WritableWriter writer) + throws IOException { + out.writeInt(map.size()); + ObjectIterator> iterator = + map.int2ObjectEntrySet().fastIterator(); + while (iterator.hasNext()) { + Int2ObjectMap.Entry entry = iterator.next(); + out.writeInt(entry.getIntKey()); + writer.write(out, entry.getValue()); + } + } + + @Override + public void readFields(DataInput in, WritableWriter writer) + throws IOException { + int size = in.readInt(); + map.clear(); + map.trim(size); + while (size-- > 0) { + int key = in.readInt(); + V value = writer.readFields(in); + map.put(key, value); + } + } + } + + /** LongWritable implementation of Basic2ObjectMap */ + public static final class BasicLong2ObjectOpenHashMap + extends Basic2ObjectMap { + /** Map */ + private final Long2ObjectOpenHashMap map; + + /** + * Constructor + * @param capacity Capacity + */ + public BasicLong2ObjectOpenHashMap(int capacity) { + this.map = new Long2ObjectOpenHashMap<>(capacity); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean containsKey(LongWritable key) { + return map.containsKey(key.get()); + } + + @Override + public V put(LongWritable key, V value) { + return map.put(key.get(), value); + } + + @Override + public V get(LongWritable key) { + return map.get(key.get()); + } + + @Override + public V remove(LongWritable key) { + return map.remove(key.get()); + } + + @Override + public PrimitiveIdTypeOps getKeyTypeOps() { + return LongTypeOps.INSTANCE; + } + + @Override + public Iterator fastKeyIterator() { + return new ReusableIterator(map.keySet().iterator()) { + @Override + public LongWritable next() { + reusableKey.set(iter.nextLong()); + return reusableKey; + } + }; + } + + @Override + public void write(DataOutput out, WritableWriter writer) + throws IOException { + out.writeInt(map.size()); + ObjectIterator> iterator = + map.long2ObjectEntrySet().fastIterator(); + while (iterator.hasNext()) { + Long2ObjectMap.Entry entry = iterator.next(); + out.writeLong(entry.getLongKey()); + writer.write(out, entry.getValue()); + } + } + + @Override + public void readFields(DataInput in, WritableWriter writer) + throws IOException { + int size = in.readInt(); + map.clear(); + map.trim(size); + while (size-- > 0) { + long key = in.readLong(); + V value = writer.readFields(in); + map.put(key, value); + } + } + } +}