Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ABED4200CAF for ; Thu, 22 Jun 2017 20:13:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AA4AE160BE7; Thu, 22 Jun 2017 18:13:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 72251160BD3 for ; Thu, 22 Jun 2017 20:13:39 +0200 (CEST) Received: (qmail 57034 invoked by uid 500); 22 Jun 2017 18:13:38 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 57025 invoked by uid 99); 22 Jun 2017 18:13:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 18:13:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id BD1D1C0311 for ; Thu, 22 Jun 2017 18:13:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.221 X-Spam-Level: X-Spam-Status: No, score=-4.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id f5v1XsCs3x6r for ; Thu, 22 Jun 2017 18:13:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 182135F60D for ; Thu, 22 Jun 2017 18:13:30 +0000 (UTC) Received: (qmail 57009 invoked by uid 99); 22 Jun 2017 18:13:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 18:13:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C6E0DFC25; Thu, 22 Jun 2017 18:13:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.incubator.apache.org Message-Id: <09f9e24d5fb64d7a9aed9321670b3176@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-atlas git commit: ATLAS-1889: fix to handle concurrent calls to update tags for an entity Date: Thu, 22 Jun 2017 18:13:30 +0000 (UTC) archived-at: Thu, 22 Jun 2017 18:13:40 -0000 Repository: incubator-atlas Updated Branches: refs/heads/0.8-incubating 9855b12e9 -> ab7ebacd7 ATLAS-1889: fix to handle concurrent calls to update tags for an entity Signed-off-by: Madhan Neethiraj (cherry picked from commit bcb128af3283432a1da3ab4a524cd200f647f862) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ab7ebacd Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ab7ebacd Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ab7ebacd Branch: refs/heads/0.8-incubating Commit: ab7ebacd7231c345aeb55524a6b84a838d98739f Parents: 9855b12 Author: ashutoshm Authored: Thu Jun 22 07:35:04 2017 -0700 Committer: Madhan Neethiraj Committed: Thu Jun 22 10:58:33 2017 -0700 ---------------------------------------------------------------------- .../atlas/GraphTransactionInterceptor.java | 120 ++++++++++ .../graph/GraphBackedMetadataRepository.java | 28 ++- .../store/graph/v1/AtlasEntityStoreV1.java | 7 + .../utils/ObjectUpdateSynchronizerTest.java | 218 +++++++++++++++++++ 4 files changed, 364 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ab7ebacd/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index 7d3bdf7..c6a4bbe 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -17,6 +17,7 @@ package org.apache.atlas; +import com.google.common.annotations.VisibleForTesting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.atlas.exception.AtlasBaseException; @@ -29,12 +30,18 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @Component public class GraphTransactionInterceptor implements MethodInterceptor { private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); + @VisibleForTesting + private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer(); private static final ThreadLocal> postTransactionHooks = new ThreadLocal<>(); private final AtlasGraph graph; @@ -82,9 +89,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor { } } } + + OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects(); } } + public static void lockObjectAndReleasePostCommit(final String guid) { + OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid); + } + + public static void lockObjectAndReleasePostCommit(final List guids) { + OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids); + } + boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); @@ -110,4 +127,107 @@ public class GraphTransactionInterceptor implements MethodInterceptor { public abstract void onComplete(boolean isSuccess); } + + private static class RefCountedReentrantLock extends ReentrantLock { + private int refCount; + + public RefCountedReentrantLock() { + this.refCount = 0; + } + + public int increment() { + return ++refCount; + } + + public int decrement() { + return --refCount; + } + + public int getRefCount() { return refCount; } + } + + + public static class ObjectUpdateSynchronizer { + private final Map guidLockMap = new ConcurrentHashMap<>(); + private final ThreadLocal> lockedGuids = new ThreadLocal>() { + @Override + protected List initialValue() { + return new ArrayList<>(); + } + }; + + public void lockObject(final List guids) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> lockObject(): guids: {}", guids); + } + + Collections.sort(guids); + for (String g : guids) { + lockObject(g); + } + } + + private void lockObject(final String guid) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size()); + } + + ReentrantLock lock = getOrCreateObjectLock(guid); + lock.lock(); + + lockedGuids.get().add(guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== lockObject(): guid: {}, guidLockMap.size: {}", guid, guidLockMap.size()); + } + } + + public void releaseLockedObjects() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size()); + } + + for (String guid : lockedGuids.get()) { + releaseObjectLock(guid); + } + + lockedGuids.get().clear(); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== releaseLockedObjects(): lockedGuids.size: {}", lockedGuids.get().size()); + } + } + + private RefCountedReentrantLock getOrCreateObjectLock(String guid) { + synchronized (guidLockMap) { + RefCountedReentrantLock ret = guidLockMap.get(guid); + if (ret == null) { + ret = new RefCountedReentrantLock(); + guidLockMap.put(guid, ret); + } + + ret.increment(); + return ret; + } + } + + private RefCountedReentrantLock releaseObjectLock(String guid) { + synchronized (guidLockMap) { + RefCountedReentrantLock lock = guidLockMap.get(guid); + if (lock != null && lock.isHeldByCurrentThread()) { + int refCount = lock.decrement(); + + if (refCount == 0) { + guidLockMap.remove(guid); + } + + lock.unlock(); + } else { + LOG.warn("releaseLockedObjects: {} Attempting to release a lock not held by current thread.", guid); + } + + return lock; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ab7ebacd/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 5bec8fa..0f3b06b 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -21,6 +21,7 @@ package org.apache.atlas.repository.graph; import com.google.common.base.Preconditions; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.model.instance.GuidMapping; @@ -49,7 +50,16 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * An implementation backed by a Graph database provided @@ -303,6 +313,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { LOG.debug("Adding a new trait={} for entities={}", traitInstance.getTypeName(), entityGuids); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuids); for (String entityGuid : entityGuids) { addTraitImpl(entityGuid, traitInstance); } @@ -321,12 +332,12 @@ public class GraphBackedMetadataRepository implements MetadataRepository { Preconditions.checkNotNull(guid, "guid cannot be null"); Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null"); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); addTraitImpl(guid, traitInstance); } private void addTraitImpl(String guid, ITypedStruct traitInstance) throws RepositoryException { final String traitName = traitInstance.getTypeName(); - if (LOG.isDebugEnabled()) { LOG.debug("Adding a new trait={} for entity={}", traitName, guid); } @@ -365,9 +376,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); - } + LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid); @@ -383,11 +393,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository { AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); if(edge != null) { deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true); - - // update the traits in entity once trait removal is successful - traitNames.remove(traitNameToBeDeleted); - updateTraits(instanceVertex, traitNames); } + + // update the traits in entity once trait removal is successful + traitNames.remove(traitNameToBeDeleted); + updateTraits(instanceVertex, traitNames); } catch (Exception e) { throw new RepositoryException(e); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ab7ebacd/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 75e9132..5ea4ff2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; @@ -456,6 +457,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { LOG.debug("Adding classifications={} to entity={}", classifications, guid); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); for (AtlasClassification classification : classifications) { validateAndNormalize(classification); } @@ -484,6 +486,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); List updatedClassifications = new ArrayList<>(); for (AtlasClassification newClassification : newClassifications) { @@ -527,6 +530,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { LOG.debug("Adding classification={} to entities={}", classification, guids); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids); + validateAndNormalize(classification); List classifications = Collections.singletonList(classification); @@ -557,6 +562,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid); } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); + entityGraphMapper.deleteClassifications(guid, classificationNames); // notify listeners on classification deletion http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ab7ebacd/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java new file mode 100644 index 0000000..03ebae4 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/utils/ObjectUpdateSynchronizerTest.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.utils; + +import org.apache.atlas.GraphTransactionInterceptor; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.springframework.util.CollectionUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +public class ObjectUpdateSynchronizerTest { + private static final GraphTransactionInterceptor.ObjectUpdateSynchronizer objectUpdateSynchronizer = new GraphTransactionInterceptor.ObjectUpdateSynchronizer(); + + private final List outputList = new ArrayList<>(); + private final int MAX_COUNT = 10; + + class CounterThread extends Thread { + String ids[]; + public CounterThread(String id) { + this.ids = new String[1]; + this.ids[0] = id; + } + + public void setIds(String... ids) { + this.ids = ids; + } + + public void run() { + objectUpdateSynchronizer.lockObject(CollectionUtils.arrayToList(ids)); + for (int i = 0; i < MAX_COUNT; i++) { + outputList.add(i); + RandomStringUtils.randomAlphabetic(20); + } + + objectUpdateSynchronizer.releaseLockedObjects(); + } + } + + @BeforeMethod + public void clearOutputList() { + outputList.clear(); + } + + @Test + public void singleThreadRun() throws InterruptedException { + verifyMultipleThreadRun(1); + } + + @Test + public void twoThreadsAccessingDifferntGuids_DoNotSerialize() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 2); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(2)); + } + + @Test + public void twoThreadsAccessingSameGuid_Serialize() throws InterruptedException { + verifyMultipleThreadRun(2); + } + + @Test + public void severalThreadsAccessingSameGuid_Serialize() throws InterruptedException { + verifyMultipleThreadRun(10); + } + + @Test + public void severalThreadsSequentialAccessingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 10); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("1", "2", "3", "4"); + th[i++].setIds("1", "2", "3"); + th[i++].setIds("1", "2"); + th[i++].setIds("1"); + th[i++].setIds("1", "2"); + th[i++].setIds("1", "2", "3"); + th[i++].setIds("1", "2", "3", "4"); + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("1"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsNonSequentialAccessingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("2", "1", "3", "4", "5"); + th[i++].setIds("3", "2", "4", "1"); + th[i++].setIds("2", "3", "1"); + th[i++].setIds("1", "2"); + th[i++].setIds("1"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsAccessingOverlappingListOfGuids() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("3", "4", "5", "6"); + th[i++].setIds("5", "6", "7"); + th[i++].setIds("7", "8"); + th[i++].setIds("8"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(th.length)); + } + + + @Test + public void severalThreadsAccessingOverlappingListOfGuids2() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 3); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "5"); + th[i++].setIds("6", "7", "8", "9"); + th[i++].setIds("4", "5", "6"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayNotEquals(populateExpectedArrayOutput(th.length)); + } + + @Test + public void severalThreadsAccessingOverlappingListOfGuidsEnsuringSerialOutput() throws InterruptedException { + CounterThread th[] = getCounterThreads(false, 5); + int i = 0; + th[i++].setIds("1", "2", "3", "4", "7"); + th[i++].setIds("3", "4", "5", "7"); + th[i++].setIds("5", "6", "7"); + th[i++].setIds("7", "8"); + th[i++].setIds("7"); + + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(th.length)); + } + + private void verifyMultipleThreadRun(int limit) throws InterruptedException { + CounterThread[] th = getCounterThreads(limit); + startCounterThreads(th); + waitForThreadsToEnd(th); + assertArrayEquals(populateExpectedArrayOutput(limit)); + } + + private void startCounterThreads(CounterThread[] th) { + for (int i = 0; i < th.length; i++) { + th[i].start(); + } + } + private CounterThread[] getCounterThreads(int limit) { + return getCounterThreads(true, limit); + } + + private CounterThread[] getCounterThreads(boolean sameId, int limit) { + CounterThread th[] = new CounterThread[limit]; + for (Integer i = 0; i < limit; i++) { + th[i] = new CounterThread(sameId ? "1" : i.toString()); + } + return th; + } + + + private void assertArrayEquals(List expected) { + assertEquals(outputList.toArray(), expected.toArray()); + } + + private void assertArrayNotEquals(List expected) { + assertFalse(ArrayUtils.isEquals(outputList.toArray(), expected)); + } + + private void waitForThreadsToEnd(CounterThread... threads) throws InterruptedException { + for (Thread t : threads) { + t.join(); + } + } + + private List populateExpectedArrayOutput(int limit) { + List list = new ArrayList<>(); + for (int i = 0; i < limit*MAX_COUNT; i+=MAX_COUNT) { + for (int j = 0; j < MAX_COUNT; j++) { + list.add(j); + } + } + + return list; + } +}