Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9184411AF1 for ; Thu, 28 Aug 2014 03:36:53 +0000 (UTC) Received: (qmail 97971 invoked by uid 500); 28 Aug 2014 03:36:53 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 97911 invoked by uid 500); 28 Aug 2014 03:36:53 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 97891 invoked by uid 99); 28 Aug 2014 03:36:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 03:36:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D6133A01C4A; Thu, 28 Aug 2014 03:36:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: toddnine@apache.org To: commits@usergrid.apache.org Date: Thu, 28 Aug 2014 03:36:54 -0000 Message-Id: In-Reply-To: <87c27cb82b9e408d9969874dd8b52f5d@git.apache.org> References: <87c27cb82b9e408d9969874dd8b52f5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] Updated OrderedMerge to use a faster implementation at runtime. After initialization, it's an O(1) emit operation as long as our produces are fast enough. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java deleted file mode 100644 index 2edea56..0000000 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. - * - */ - -package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; - - -import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.netflix.astyanax.model.CompositeBuilder; -import com.netflix.astyanax.model.CompositeParser; - - -public class RowTypeSerializer implements CompositeFieldSerializer { - - private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); - - - @Override - public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) { - - //add the row id to the composite - ID_SER.toComposite( builder, keyType.nodeId ); - - builder.addString( keyType.edgeType ); - builder.addString( keyType.idType ); - - builder.addLong( keyType.shardId ); - } - - - @Override - public RowKeyType fromComposite( final CompositeParser composite ) { - - final Id id = ID_SER.fromComposite( composite ); - final String edgeType = composite.readString(); - final String idType = composite.readString(); - final long shard = composite.readLong(); - - return new RowKeyType( id, edgeType, idType, shard); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java index c8a884b..f1b5108 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java @@ -6,32 +6,51 @@ import java.util.NoSuchElementException; import org.apache.commons.collections4.iterators.PushbackIterator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction; import com.google.common.base.Preconditions; +import rx.schedulers.Schedulers; + /** - * Utility class that will take an iterator of all shards, and combine them into an iterator - * of ShardEntryGroups. These groups can then be used in a distributed system to handle concurrent reads and writes + * Utility class that will take an iterator of all shards, and combine them into an iterator of ShardEntryGroups. These + * groups can then be used in a distributed system to handle concurrent reads and writes */ public class ShardEntryGroupIterator implements Iterator { - private ShardEntryGroup next; + private final ShardGroupCompaction shardGroupCompaction; private final PushbackIterator sourceIterator; private final long minDelta; + private final ApplicationScope scope; + private final DirectedEdgeMeta directedEdgeMeta; + + + private ShardEntryGroup next; /** * Create a shard iterator - * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to Long.MIN + * + * @param shardIterator The iterator of all shards. Order is expected to be by the shard index from Long.MAX to + * Long.MIN * @param minDelta The minimum delta we allow to consider shards the same group */ - public ShardEntryGroupIterator( final Iterator shardIterator, final long minDelta ) { - Preconditions.checkArgument(shardIterator.hasNext(), "Shard iterator must have shards present"); + public ShardEntryGroupIterator( final Iterator shardIterator, final long minDelta, + final ShardGroupCompaction shardGroupCompaction, final ApplicationScope scope, + final DirectedEdgeMeta directedEdgeMeta ) { + + + Preconditions.checkArgument( shardIterator.hasNext(), "Shard iterator must have shards present" ); + this.scope = scope; + this.directedEdgeMeta = directedEdgeMeta; this.sourceIterator = new PushbackIterator( shardIterator ); + this.shardGroupCompaction = shardGroupCompaction; this.minDelta = minDelta; } @@ -78,7 +97,7 @@ public class ShardEntryGroupIterator implements Iterator { */ while ( sourceIterator.hasNext() ) { - if(next == null){ + if ( next == null ) { next = new ShardEntryGroup( minDelta ); } @@ -92,9 +111,13 @@ public class ShardEntryGroupIterator implements Iterator { sourceIterator.pushback( shard ); + break; } - + //now perform the audit (maybe) + if(next != null) { + shardGroupCompaction.evaluateShardGroup( scope, directedEdgeMeta, next ); + } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index c566d43..5076424 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -23,21 +23,32 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.nio.charset.Charset; -import java.util.BitSet; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.consistency.TimeService; -import org.apache.usergrid.persistence.core.rx.ObservableIterator; +import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; @@ -47,20 +58,22 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntry import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.common.base.Preconditions; -import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; +import com.google.common.hash.PrimitiveSink; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.google.inject.Singleton; - -import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.schedulers.Schedulers; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; /** @@ -70,27 +83,33 @@ import rx.schedulers.Schedulers; public class ShardGroupCompactionImpl implements ShardGroupCompaction { + private static final Logger LOG = LoggerFactory.getLogger( ShardGroupCompactionImpl.class ); + + private static final Charset CHARSET = Charset.forName( "UTF-8" ); private static final HashFunction MURMUR_128 = Hashing.murmur3_128(); + + private final ListeningExecutorService executorService; private final TimeService timeService; private final GraphFig graphFig; private final NodeShardAllocation nodeShardAllocation; private final ShardedEdgeSerialization shardedEdgeSerialization; private final EdgeColumnFamilies edgeColumnFamilies; + private final Keyspace keyspace; private final EdgeShardSerialization edgeShardSerialization; - private final Random random; private final ShardCompactionTaskTracker shardCompactionTaskTracker; + private final ShardAuditTaskTracker shardAuditTaskTracker; @Inject public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig, final NodeShardAllocation nodeShardAllocation, final ShardedEdgeSerialization shardedEdgeSerialization, - final EdgeColumnFamilies edgeColumnFamilies, + final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace, final EdgeShardSerialization edgeShardSerialization ) { this.timeService = timeService; @@ -98,157 +117,343 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.nodeShardAllocation = nodeShardAllocation; this.shardedEdgeSerialization = shardedEdgeSerialization; this.edgeColumnFamilies = edgeColumnFamilies; + this.keyspace = keyspace; this.edgeShardSerialization = edgeShardSerialization; this.random = new Random(); this.shardCompactionTaskTracker = new ShardCompactionTaskTracker(); + this.shardAuditTaskTracker = new ShardAuditTaskTracker(); + + executorService = MoreExecutors.listeningDecorator( + new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) ); } - @Override - public Set compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, - final ShardEntryGroup group ) { + /** + * Execute the compaction task. Will return the status the operations performed + * + * @param group The shard entry group to compact + * + * @return The result of the compaction operation + */ + public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, + final ShardEntryGroup group ) { + + final long startTime = timeService.getCurrentTime(); + Preconditions.checkNotNull( group, "group cannot be null" ); Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" ); - Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" ); + Preconditions.checkArgument( group.shouldCompact( startTime ), + "Compaction cannot be run yet. Ignoring compaction." ); - /** - * It's already compacting, don't do anything - */ - if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){ - return Collections.emptySet(); - } + final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder(); final Shard targetShard = group.getCompactionTarget(); - final Collection sourceShards = group.getReadShards(); + final Set sourceShards = new HashSet<>( group.getReadShards() ); + //remove the target + sourceShards.remove( targetShard ); - Observable.create( new ObservableIterator( "Shard_Repair" ) { - @Override - protected Iterator getIterator() { - return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE ); - } - } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1>() { - @Override - public void call( final List markedEdges ) { + final UUID timestamp = UUIDGenerator.newTimeUUID(); - } - }).doOnNext( new Action1>() { - @Override - public void call( final List markedEdges ) { + final long newShardPivot = targetShard.getShardIndex(); - } - } ); + final int maxWorkSize = graphFig.getScanPageSize(); + final MutationBatch newRowBatch = keyspace.prepareMutationBatch(); + final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch(); + /** + * As we move edges, we want to keep track of it + */ + long edgeCount = 0; - return null; - } + for ( Shard sourceShard : sourceShards ) { + Iterator edges = edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, + Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); + while ( edges.hasNext() ) { + final MarkedEdge edge = edges.next(); - @Override - public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, - final ShardEntryGroup group ) { + final long edgeTimestamp = edge.getTimestamp(); + /** + * The edge is within a different shard, break + */ + if ( edgeTimestamp < newShardPivot ) { + break; + } - final double repairChance = random.nextDouble(); - //don't repair - if ( repairChance > graphFig.getShardRepairChance() ) { - return AuditResult.NOT_CHECKED; + newRowBatch.mergeShallow( + edgeMeta.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, + timestamp ) ); + + deleteRowBatch.mergeShallow( + edgeMeta.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, + timestamp ) ); + + edgeCount++; + + //if we're at our count, execute the mutation of writing the edges to the new row, then remove them + //from the old rows + if ( edgeCount % maxWorkSize == 0 ) { + + try { + HystrixCassandra.async( newRowBatch ); + HystrixCassandra.async( deleteRowBatch ); + } + catch ( Throwable t ) { + LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard ); + } + } + } } + try { + HystrixCassandra.async( newRowBatch ); + HystrixCassandra.async( deleteRowBatch ); + } + catch ( Throwable t ) { + LOG.error( "Unable to move edges to target shard {}", targetShard ); + } + + + LOG.info( "Finished compacting {} shards and moved {} edges", sourceShards, edgeCount ); + + resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard ); + /** - * We don't have a compaction pending. Run an audit on the shards + * We didn't move anything this pass, mark the shard as compacted. If we move something, it means that we missed it on the first pass + * or someone is still not writing to the target shard only. */ - if ( !group.isCompactionPending() ) { + if ( edgeCount == 0 ) { - /** - * Check if we should allocate, we may want to - */ + //now that we've marked our target as compacted, we can successfully remove any shards that are not + // compacted themselves in the sources + + final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch(); + + for ( Shard source : sourceShards ) { + + //if we can't safely delete it, don't do so + if ( !group.canBeDeleted( source ) ) { + continue; + } - final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta ); + LOG.info( "Source shards have been fully drained. Removing shard {}", source ); + final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta ); + shardRemovalRollup.mergeShallow( shardRemoval ); - if ( !created ) { - return AuditResult.CHECKED_NO_OP; + resultBuilder.withRemovedShard( source ); } - return AuditResult.CHECKED_CREATED; + HystrixCassandra.async( shardRemovalRollup ); + + + LOG.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard ); + + //Overwrite our shard index with a newly created one that has been marked as compacted + Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true ); + final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta ); + HystrixCassandra.async( updateMark ); + + resultBuilder.withCompactedShard( compactedShard ); } - //check our taskmanager + return resultBuilder.build(); + } + @Override + public ListenableFuture evaluateShardGroup( final ApplicationScope scope, + final DirectedEdgeMeta edgeMeta, + final ShardEntryGroup group ) { + + final double repairChance = random.nextDouble(); + + + //don't audit, we didn't hit our chance + if ( repairChance > graphFig.getShardRepairChance() ) { + return Futures.immediateFuture( AuditResult.NOT_CHECKED ); + } + /** - * Do the compaction + * Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to + * hose the system */ - if ( group.shouldCompact( timeService.getCurrentTime() ) ) { - compact( scope, edgeMeta, group ); - return AuditResult.COMPACTING; - } + ListenableFuture future = executorService.submit( new Callable() { + @Override + public AuditResult call() throws Exception { + + + /** + * We don't have a compaction pending. Run an audit on the shards + */ + if ( !group.isCompactionPending() ) { + + /** + * Check if we should allocate, we may want to + */ + + /** + * It's already compacting, don't do anything + */ + if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) { + return AuditResult.CHECKED_NO_OP; + } + + try { + + final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta ); + if ( !created ) { + return AuditResult.CHECKED_NO_OP; + } + } + finally { + shardAuditTaskTracker.complete( scope, edgeMeta, group ); + } + + + return AuditResult.CHECKED_CREATED; + } + + //check our taskmanager + + + /** + * Do the compaction + */ + if ( group.shouldCompact( timeService.getCurrentTime() ) ) { + /** + * It's already compacting, don't do anything + */ + if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) { + return AuditResult.COMPACTING; + } + + /** + * We use a finally b/c we always want to remove the task track + */ + try { + CompactionResult result = compact( scope, edgeMeta, group ); + LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} ); + } + finally { + shardCompactionTaskTracker.complete( scope, edgeMeta, group ); + } + return AuditResult.COMPACTED; + } + + //no op, there's nothing we need to do to this shard + return AuditResult.NOT_CHECKED; + } + } ); + + /** + * Log our success or failures for debugging purposes + */ + Futures.addCallback( future, new FutureCallback() { + @Override + public void onSuccess( @Nullable final AuditResult result ) { + LOG.debug( "Successfully completed audit of task {}", result ); + } - //no op, there's nothing we need to do to this shard - return AuditResult.NOT_CHECKED; + + @Override + public void onFailure( final Throwable t ) { + LOG.error( "Unable to perform audit. Exception is ", t ); + } + } ); + + return future; } - private static final class ShardCompactionTaskTracker { - private BitSet runningTasks = new BitSet(); + /** + * Inner class used to track running tasks per instance + */ + private static abstract class TaskTracker { + + private static final Boolean TRUE = true; + + private ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); /** * Sets this data into our scope to signal it's running to stop other threads from attempting to run - * @param scope - * @param edgeMeta - * @param group - * @return */ - public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, + public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) { - final int hash = doHash( scope, edgeMeta, group ).asInt(); + final Long hash = doHash( scope, edgeMeta, group ).hash().asLong(); - if(runningTasks.get( hash )){ - return false; - } - - runningTasks.set( hash ); + final Boolean returned = runningTasks.putIfAbsent( hash, TRUE ); - return true; + /** + * Someone already put the value + */ + return returned == null; } /** * Mark this entry group as complete - * @param scope - * @param edgeMeta - * @param group */ - public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, - ShardEntryGroup group ) { - final int hash = doHash( scope, edgeMeta, group ).asInt(); - runningTasks.clear( hash ); + public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) { + final long hash = doHash( scope, edgeMeta, group ).hash().asLong(); + runningTasks.remove( hash ); } + protected abstract Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta, + final ShardEntryGroup shardEntryGroup ); + } + + + /** + * Task tracker for shard compaction + */ + private static final class ShardCompactionTaskTracker extends ShardAuditTaskTracker { + /** * Hash our data into a consistent long - * @param scope - * @param directedEdgeMeta - * @param shardEntryGroup - * @return */ - private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta, + protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta, + final ShardEntryGroup shardEntryGroup ) { + + final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup ); + + //add our compaction target to the hash + final Shard compactionTarget = shardEntryGroup.getCompactionTarget(); + + hasher.putLong( compactionTarget.getShardIndex() ); + + + return hasher; + } + } + + + /** + * Task tracker for shard audit + */ + private static class ShardAuditTaskTracker extends TaskTracker { + + /** + * Hash our data into a consistent long + */ + protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta, final ShardEntryGroup shardEntryGroup ) { final Hasher hasher = MURMUR_128.newHasher(); @@ -273,35 +478,141 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } //add our compaction target to the hash - final Shard compactionTarget = shardEntryGroup.getCompactionTarget(); - hasher.putLong( compactionTarget.getShardIndex() ); - - return hasher.hash(); + return hasher; } - private void addToHash( final Hasher hasher, final Id id ) { + protected void addToHash( final PrimitiveSink into, final Id id ) { final UUID nodeUuid = id.getUuid(); final String nodeType = id.getType(); - hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() ) - .putString( nodeType, CHARSET ); + into.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() ) + .putString( nodeType, CHARSET ); + } + } + + + /** + * Create a thread pool that will reject work if our audit tasks become overwhelmed + */ + private final class MaxSizeThreadPool extends ThreadPoolExecutor { + + public MaxSizeThreadPool( final int workerSize, final int queueLength ) { + super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue( queueLength ), + new CompactionThreadFactory(), new RejectionLogger() ); + } + } + + + private final class CompactionThreadFactory implements ThreadFactory { + + private final AtomicLong threadCounter = new AtomicLong(); + + + @Override + public Thread newThread( final Runnable r ) { + final long newValue = threadCounter.incrementAndGet(); + + return new Thread( r, "Graph-Shard-Compaction-" + newValue ); } } - private enum StartResult{ - /** - * Returned if the compaction was started - */ - STARTED, + private final class RejectionLogger implements RejectedExecutionHandler { + + + @Override + public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { + LOG.warn( "Audit queue full, rejecting audit task {}", r ); + } + } + + + + public static final class CompactionResult { + + public final long copiedEdges; + public final Shard targetShard; + public final Set sourceShards; + public final Set removedShards; + public final Shard compactedShard; + + + + private CompactionResult( final long copiedEdges, final Shard targetShard, final Set sourceShards, + final Set removedShards, final Shard compactedShard ) { + this.copiedEdges = copiedEdges; + this.targetShard = targetShard; + this.compactedShard = compactedShard; + this.sourceShards = Collections.unmodifiableSet( sourceShards ); + this.removedShards = Collections.unmodifiableSet( removedShards ); + } + /** - * Returned if we are running the compaction + * Create a builder to use to create the result */ - RUNNING; + public static CompactionBuilder builder() { + return new CompactionBuilder(); + } + + + @Override + public String toString() { + return "CompactionResult{" + + "copiedEdges=" + copiedEdges + + ", targetShard=" + targetShard + + ", sourceShards=" + sourceShards + + ", removedShards=" + removedShards + + ", compactedShard=" + compactedShard + + '}'; + } + + + public static final class CompactionBuilder { + private long copiedEdges; + private Shard targetShard; + private Set sourceShards; + private Set removedShards = new HashSet<>(); + private Shard compactedShard; + + + public CompactionBuilder withCopiedEdges( final long copiedEdges ) { + this.copiedEdges = copiedEdges; + return this; + } + + + public CompactionBuilder withTargetShard( final Shard targetShard ) { + this.targetShard = targetShard; + return this; + } + + + public CompactionBuilder withSourceShards( final Set sourceShards ) { + this.sourceShards = sourceShards; + return this; + } + + + public CompactionBuilder withRemovedShard( final Shard removedShard ) { + this.removedShards.add( removedShard ); + return this; + } + + + public CompactionBuilder withCompactedShard( final Shard compactedShard ) { + this.compactedShard = compactedShard; + return this; + } + + + public CompactionResult build() { + return new CompactionResult( copiedEdges, targetShard, sourceShards, removedShards, compactedShard ); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index 030e4a7..b0523e1 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -22,9 +22,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.UUID; +import javax.annotation.Nullable; import javax.inject.Inject; import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; @@ -49,9 +51,16 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey; import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType; import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators + .SourceDirectedEdgeDescendingComparator; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators + .TargetDirectedEdgeDescendingComparator; import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; @@ -256,6 +265,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Shard shard, final boolean isDeleted ) { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge ); + writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -278,6 +288,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) ) .deleteColumn( edge ); + writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -297,6 +308,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Shard shard, final boolean isDeleted ) { batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge ); + writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -319,6 +331,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) ) .deleteColumn( edge ); + writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -338,6 +351,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final boolean isDeleted ) { batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) ) .deleteColumn( column ); + writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta ); } }.createBatch( scope, shards, timestamp ); } @@ -357,8 +371,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { columnFamilies.getGraphEdgeVersions(); final Serializer serializer = columnFamily.getColumnSerializer(); + + final OrderedComparator comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder()); + + + + final EdgeSearcher searcher = - new EdgeSearcher( scope, maxTimestamp, search.last(), shards ) { + new EdgeSearcher( scope, shards, search.getOrder(), comparator, maxTimestamp, + search.last().transform( TRANSFORM ) ) { + @Override protected Serializer getSerializer() { @@ -367,11 +389,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - public void setRange( final RangeBuilder builder ) { + public void buildRange( final RangeBuilder builder ) { if ( last.isPresent() ) { - super.setRange( builder ); + super.buildRange( builder ); return; } @@ -387,7 +409,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - protected Long getStartColumn( final Edge last ) { + protected Long createColumn( final MarkedEdge last ) { return last.getTimestamp(); } @@ -398,10 +420,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { } - @Override - public int compare( final MarkedEdge o1, final MarkedEdge o2 ) { - return Long.compare( o1.getTimestamp(), o2.getTimestamp() ); - } + }; return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), @@ -411,23 +430,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override public Iterator getEdgesFromSource( final EdgeColumnFamilies columnFamilies, - final ApplicationScope scope, final SearchByEdgeType edgeType, + final ApplicationScope scope, final SearchByEdgeType search, final Collection shards ) { ValidationUtils.validateApplicationScope( scope ); - GraphValidation.validateSearchByEdgeType( edgeType ); + GraphValidation.validateSearchByEdgeType( search ); - final Id sourceId = edgeType.getNode(); - final String type = edgeType.getType(); - final long maxTimestamp = edgeType.getMaxTimestamp(); + final Id sourceId = search.getNode(); + final String type = search.getType(); + final long maxTimestamp = search.getMaxTimestamp(); final MultiTennantColumnFamily columnFamily = columnFamilies.getSourceNodeCfName(); final Serializer serializer = columnFamily.getColumnSerializer(); - final SourceEdgeSearcher searcher = - new SourceEdgeSearcher( scope, maxTimestamp, edgeType.last(), - shards ) { + final OrderedComparator comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + + + + final EdgeSearcher searcher = + new EdgeSearcher( scope, shards, search.getOrder(), comparator, maxTimestamp, + search.last().transform( TRANSFORM ) ) { @Override @@ -443,7 +466,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - protected DirectedEdge getStartColumn( final Edge last ) { + protected DirectedEdge createColumn( final MarkedEdge last ) { return new DirectedEdge( last.getTargetNode(), last.getTimestamp() ); } @@ -463,24 +486,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override public Iterator getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, - final SearchByIdType edgeType, + final SearchByIdType search, final Collection shards ) { ValidationUtils.validateApplicationScope( scope ); - GraphValidation.validateSearchByEdgeType( edgeType ); + GraphValidation.validateSearchByEdgeType( search ); - final Id targetId = edgeType.getNode(); - final String type = edgeType.getType(); - final String targetType = edgeType.getIdType(); - final long maxTimestamp = edgeType.getMaxTimestamp(); + final Id targetId = search.getNode(); + final String type = search.getType(); + final String targetType = search.getIdType(); + final long maxTimestamp = search.getMaxTimestamp(); final MultiTennantColumnFamily columnFamily = columnFamilies.getSourceNodeTargetTypeCfName(); final Serializer serializer = columnFamily.getColumnSerializer(); + final OrderedComparator comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + - final SourceEdgeSearcher searcher = - new SourceEdgeSearcher( scope, maxTimestamp, edgeType.last(), - shards ) { + final EdgeSearcher searcher = + new EdgeSearcher( scope, shards, search.getOrder(), comparator, maxTimestamp, + search.last().transform( TRANSFORM ) ) { @Override protected Serializer getSerializer() { @@ -495,7 +520,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - protected DirectedEdge getStartColumn( final Edge last ) { + protected DirectedEdge createColumn( final MarkedEdge last ) { return new DirectedEdge( last.getTargetNode(), last.getTimestamp() ); } @@ -513,20 +538,22 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override public Iterator getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, - final SearchByEdgeType edgeType, final Collection shards ) { + final SearchByEdgeType search, final Collection shards ) { ValidationUtils.validateApplicationScope( scope ); - GraphValidation.validateSearchByEdgeType( edgeType ); + GraphValidation.validateSearchByEdgeType( search ); - final Id targetId = edgeType.getNode(); - final String type = edgeType.getType(); - final long maxTimestamp = edgeType.getMaxTimestamp(); + final Id targetId = search.getNode(); + final String type = search.getType(); + final long maxTimestamp = search.getMaxTimestamp(); final MultiTennantColumnFamily columnFamily = columnFamilies.getTargetNodeCfName(); final Serializer serializer = columnFamily.getColumnSerializer(); - final TargetEdgeSearcher searcher = - new TargetEdgeSearcher( scope, maxTimestamp, edgeType.last(), - shards ) { + final OrderedComparator comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + + final EdgeSearcher searcher = + new EdgeSearcher( scope, shards, search.getOrder(),comparator, maxTimestamp, + search.last().transform( TRANSFORM ) ) { @Override protected Serializer getSerializer() { @@ -541,7 +568,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - protected DirectedEdge getStartColumn( final Edge last ) { + protected DirectedEdge createColumn( final MarkedEdge last ) { return new DirectedEdge( last.getSourceNode(), last.getTimestamp() ); } @@ -561,24 +588,26 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override public Iterator getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, - final SearchByIdType edgeType, + final SearchByIdType search, final Collection shards ) { ValidationUtils.validateApplicationScope( scope ); - GraphValidation.validateSearchByEdgeType( edgeType ); + GraphValidation.validateSearchByEdgeType( search ); - final Id targetId = edgeType.getNode(); - final String sourceType = edgeType.getIdType(); - final String type = edgeType.getType(); - final long maxTimestamp = edgeType.getMaxTimestamp(); + final Id targetId = search.getNode(); + final String sourceType = search.getIdType(); + final String type = search.getType(); + final long maxTimestamp = search.getMaxTimestamp(); final MultiTennantColumnFamily columnFamily = columnFamilies.getTargetNodeSourceTypeCfName(); final Serializer serializer = columnFamily.getColumnSerializer(); + final OrderedComparator comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + - final TargetEdgeSearcher searcher = - new TargetEdgeSearcher( scope, maxTimestamp, edgeType.last(), - shards ) { + final EdgeSearcher searcher = + new EdgeSearcher( scope, shards, search.getOrder(), comparator, maxTimestamp, + search.last().transform( TRANSFORM ) ) { @Override protected Serializer getSerializer() { return serializer; @@ -592,7 +621,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override - protected DirectedEdge getStartColumn( final Edge last ) { + protected DirectedEdge createColumn( final MarkedEdge last ) { return new DirectedEdge( last.getTargetNode(), last.getTimestamp() ); } @@ -608,51 +637,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { } - /** - * Class for performing searched on rows based on source id - */ - private static abstract class SourceEdgeSearcher extends EdgeSearcher { - - protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional last, - final Collection shards ) { - super( scope, maxTimestamp, last, shards ); - } - public int compare( final T o1, final T o2 ) { - int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() ); - - if ( compare == 0 ) { - compare = o1.getTargetNode().compareTo( o2.getTargetNode() ); - } - - return compare; - } - } - - - /** - * Class for performing searched on rows based on target id - */ - private static abstract class TargetEdgeSearcher extends EdgeSearcher { - - protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional last, - final Collection shards ) { - super( scope, maxTimestamp, last, shards ); - } - - - public int compare( final T o1, final T o2 ) { - int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() ); - - if ( compare == 0 ) { - compare = o1.getTargetNode().compareTo( o2.getTargetNode() ); - } - - return compare; - } - } - /** * Simple callback to perform puts and deletes with a common row setup code @@ -994,4 +980,27 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { return isDeleted; } } + + + + + + + private static final Function TRANSFORM = new Function() { + @Nullable + @Override + public MarkedEdge apply( @Nullable final Edge input ) { + + if ( input == null ) { + return null; + } + + if ( input instanceof MarkedEdge ) { + return ( MarkedEdge ) input; + } + + return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(), + input.getTimestamp(), false ); + } + }; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index 0c7e5b5..d99d98b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -8,6 +8,7 @@ import java.util.NoSuchElementException; import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator; import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator; +import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -86,6 +87,7 @@ public class ShardsColumnIterator implements Iterator { */ private void startIterator() { + /** * If the edge is present, we need to being seeking from this */ @@ -94,7 +96,9 @@ public class ShardsColumnIterator implements Iterator { //set the range into the search - searcher.setRange( rangeBuilder ); + searcher.buildRange( rangeBuilder ); + + /** * Get our list of slices @@ -102,27 +106,22 @@ public class ShardsColumnIterator implements Iterator { final List> rowKeys = searcher.getRowKeys(); - final List> columnNameIterators = new ArrayList<>( rowKeys.size() ); - - for(ScopedRowKey rowKey: rowKeys){ - + if(rowKeys.size() == 1){ + final RowQuery, C> query = + keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKeys.get( 0 ) ) + .autoPaginate( true ).withColumnRange( rangeBuilder.build() ); - final RowQuery, C> query = - keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey ) - .autoPaginate( true ).withColumnRange( rangeBuilder.build() ); - - - final ColumnNameIterator columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() ); + currentColumnIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() ); + } - columnNameIterators.add( columnNameIterator ); + else{ + currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize); } - currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java index 9050b0a..ddd514b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java @@ -37,6 +37,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey; import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey; import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeRowKeySerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.RowTypeSerializer; import com.netflix.astyanax.serializers.LongSerializer; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java new file mode 100644 index 0000000..6e52dd5 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DescendingTimestampComparator.java @@ -0,0 +1,43 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators; + + +import java.util.Comparator; + +import org.apache.usergrid.persistence.graph.MarkedEdge; + + +/** + * Sorts longs from high to low. High is "less than" the low values; + * + */ +public class DescendingTimestampComparator implements Comparator { + + public static final DescendingTimestampComparator INSTANCE = new DescendingTimestampComparator(); + + + @Override + public int compare( final MarkedEdge o1, final MarkedEdge o2 ) { + return Long.compare( o1.getTimestamp(), o2.getTimestamp() )*-1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java new file mode 100644 index 0000000..6ff65cb --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/DirectedEdgeDescendingComparator.java @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators; + + +import java.util.Comparator; +import java.util.UUID; + +import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.fasterxml.uuid.impl.UUIDUtil; + + +/** + * Comparator for comparing edges in descending order. The first comparison is the timestamp, + * highest value should be first, so is considered "less". If those are equal, the UUIId is compared. + * this will return the UUID to compare. It will first be descending UUID, then ascending name + * + */ +public abstract class DirectedEdgeDescendingComparator implements Comparator { + + @Override + public int compare( final MarkedEdge first, final MarkedEdge second ) { + + int compare = Long.compare( first.getTimestamp(), second.getTimestamp() ) * -1; + + if(compare == 0){ + final Id firstId = getId( first ); + final Id secondId = getId( second ); + + compare = UUIDComparator.staticCompare( firstId.getUuid(), secondId.getUuid() ) * -1; + + if(compare == 0){ + compare = firstId.getType().compareTo( secondId.getType() ); + } + } + + return compare; + } + + + /** + * Return the Id to be used in the comparison + * @param edge + * @return + */ + protected abstract Id getId(final MarkedEdge edge); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java new file mode 100644 index 0000000..003ed36 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/OrderedComparator.java @@ -0,0 +1,52 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators; + + +import java.util.Comparator; + +import org.apache.usergrid.persistence.graph.SearchByEdgeType; + + +/** + * Comparator that will compare in reverse or forward order based on the order type specified. + * + * Assumes descending uses the default order. If ASCENDING, the result of the comparator will be reversed + */ +public class OrderedComparator implements Comparator { + + + private final int invert; + private final Comparator delegate; + + + public OrderedComparator( final Comparator delegate, final SearchByEdgeType.Order order ) { + this.invert = order == SearchByEdgeType.Order.DESCENDING ? 1 : -1; + this.delegate = delegate; + } + + + @Override + public int compare( final T o1, final T o2 ) { + return delegate.compare( o1, o2 ) * invert; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java new file mode 100644 index 0000000..f067006 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparator.java @@ -0,0 +1,42 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators; + + +import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * + * Comparator that uses source Id for comparisons. Newer times will be "greater". Newer uuids will be first. + * + */ +public class SourceDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator { + + public static final SourceDirectedEdgeDescendingComparator INSTANCE = new SourceDirectedEdgeDescendingComparator(); + + @Override + protected Id getId( final MarkedEdge edge ) { + return edge.getSourceNode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java new file mode 100644 index 0000000..115a874 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparator.java @@ -0,0 +1,42 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators; + + +import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * + * Comparator that uses source Id for comparisons. Newer time uuids will be first. + * + */ +public class TargetDirectedEdgeDescendingComparator extends DirectedEdgeDescendingComparator { + + public static final TargetDirectedEdgeDescendingComparator INSTANCE = new TargetDirectedEdgeDescendingComparator(); + + @Override + protected Id getId( final MarkedEdge edge ) { + return edge.getTargetNode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java new file mode 100644 index 0000000..f1ae90b --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeRowKeySerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize; + + +import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; + + +/** + * Class to perform serialization for row keys from edges + */ + +public class EdgeRowKeySerializer implements CompositeFieldSerializer { + + private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); + + + @Override + public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) { + + //add the row id to the composite + ID_SER.toComposite( builder, key.sourceId ); + builder.addString( key.edgeType ); + ID_SER.toComposite( builder, key.targetId ); + builder.addLong( key.shardId ); + } + + + @Override + public EdgeRowKey fromComposite( final CompositeParser composite ) { + + final Id sourceId = ID_SER.fromComposite( composite ); + final String edgeType = composite.readString(); + final Id targetId = ID_SER.fromComposite( composite ); + final long shard = composite.readLong(); + + return new EdgeRowKey( sourceId, edgeType, targetId, shard ); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java new file mode 100644 index 0000000..590cf35 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java @@ -0,0 +1,77 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize; + + +import java.nio.ByteBuffer; + +import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Preconditions; +import com.netflix.astyanax.model.DynamicComposite; +import com.netflix.astyanax.serializers.AbstractSerializer; +import com.netflix.astyanax.serializers.LongSerializer; + + +/** + * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target + * Id and version. + */ +public class EdgeSerializer extends AbstractSerializer { + + private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get(); + private static final LongSerializer LONG_SERIALIZER = LongSerializer.get(); + + + @Override + public ByteBuffer toByteBuffer( final DirectedEdge edge ) { + + DynamicComposite composite = new DynamicComposite(); + + composite.addComponent( edge.timestamp, LONG_SERIALIZER ); + + ID_COL_SERIALIZER.toComposite( composite, edge.id ); + + return composite.serialize(); + } + + + @Override + public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) { + DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer ); + + Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" ); + + + //return the version + final long timestamp = composite.get( 0, LONG_SERIALIZER ); + + + //parse our id + final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 ); + + + return new DirectedEdge( id, timestamp ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java new file mode 100644 index 0000000..6b1c4e9 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeShardRowKeySerializer.java @@ -0,0 +1,103 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize; + + +import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; + + +public class EdgeShardRowKeySerializer implements CompositeFieldSerializer { + + private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); + + public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer(); + + + @Override + public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) { + + + final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes(); + + //add the stored value + builder.addInteger( meta.getType().getStorageValue() ); + + final int length = nodeMeta.length; + + builder.addInteger( length ); + + + for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) { + ID_SER.toComposite( builder, node.getId() ); + builder.addInteger( node.getNodeType().getStorageValue() ); + } + + final String[] edgeTypes = meta.getTypes(); + + builder.addInteger( edgeTypes.length ); + + for ( String type : edgeTypes ) { + builder.addString( type ); + } + } + + + @Override + public DirectedEdgeMeta fromComposite( final CompositeParser composite ) { + + + final int storageType = composite.readInteger(); + + final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType ); + + final int idLength = composite.readInteger(); + + final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength]; + + + for ( int i = 0; i < idLength; i++ ) { + final Id sourceId = ID_SER.fromComposite( composite ); + + final NodeType type = NodeType.get( composite.readInteger() ); + + nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type ); + } + + + final int length = composite.readInteger(); + + String[] types = new String[length]; + + for ( int i = 0; i < length; i++ ) { + types[i] = composite.readString(); + } + + return DirectedEdgeMeta.fromStorage( metaType, nodePairs, types ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java new file mode 100644 index 0000000..8376ef1 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowSerializer.java @@ -0,0 +1,63 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize; + + +import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; + + +/** + * Class to perform serialization for row keys from edges + */ +public class RowSerializer implements CompositeFieldSerializer { + + private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); + + + @Override + public void toComposite( final CompositeBuilder builder, final RowKey key ) { + + //add the row id to the composite + ID_SER.toComposite( builder, key.nodeId ); + + builder.addString( key.edgeType ); + builder.addLong( key.shardId ); + } + + + @Override + public RowKey fromComposite( final CompositeParser composite ) { + + final Id id = ID_SER.fromComposite( composite ); + final String edgeType = composite.readString(); + final long shard = composite.readLong(); + + + return new RowKey( id, edgeType, shard ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java new file mode 100644 index 0000000..a67c469 --- /dev/null +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/RowTypeSerializer.java @@ -0,0 +1,62 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize; + + +import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer; +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; + + +public class RowTypeSerializer implements CompositeFieldSerializer { + + private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); + + + @Override + public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) { + + //add the row id to the composite + ID_SER.toComposite( builder, keyType.nodeId ); + + builder.addString( keyType.edgeType ); + builder.addString( keyType.idType ); + + builder.addLong( keyType.shardId ); + } + + + @Override + public RowKeyType fromComposite( final CompositeParser composite ) { + + final Id id = ID_SER.fromComposite( composite ); + final String edgeType = composite.readString(); + final String idType = composite.readString(); + final long shard = composite.readLong(); + + return new RowKeyType( id, edgeType, idType, shard); + } +}