atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject incubator-atlas git commit: ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth)
Date Thu, 02 Jun 2016 09:36:06 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 59268875d -> 83d053978


ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/83d05397
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/83d05397
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/83d05397

Branch: refs/heads/master
Commit: 83d053978873e988604de2f07021878b5b987764
Parents: 5926887
Author: Hemanth Yamijala <hyamijala@hortonworks.com>
Authored: Thu Jun 2 15:05:56 2016 +0530
Committer: Hemanth Yamijala <hyamijala@hortonworks.com>
Committed: Thu Jun 2 15:05:56 2016 +0530

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties    |   4 +
 docs/src/site/twiki/Configuration.twiki         |  15 ++
 release-log.txt                                 |   1 +
 .../hbase/HBaseKeyColumnValueStore.java         |  43 ++++--
 .../hbase/HBaseKeyColumnValueStoreTest.java     | 139 +++++++++++++++++++
 5 files changed, 194 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index bfa40e8..1cdd424 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -119,3 +119,7 @@ atlas.auth.policy.file=${sys:atlas.home}/conf/policy-store.txt
 
 #########authorizer impl class #########
 atlas.authorizer.impl=SIMPLE
+
+#########  Performance Configs  #########
+#atlas.graph.storage.lock.retries=10
+#atlas.graph.storage.cache.db-cache-time=120000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 2884f42..7150483 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -220,3 +220,18 @@ atlas.client.ha.sleep.interval.ms=5000
 # Set the following property to true, to enable the setup steps to run on each server start.
Default = false.
 atlas.server.run.setup.on.start=false
 </verbatim>
+
+---++ Performance configuration items
+
+The following properties can be used to tune performance of Atlas under specific circumstances:
+
+<verbatim>
+# The number of times Atlas code tries to acquire a lock (to ensure consistency) while committing
a transaction.
+# This should be related to the amount of concurrency expected to be supported by the server.
For e.g. with retries set to 10, upto 100 threads can concurrently create types in the Atlas
system.
+# If this is set to a low value (default is 3), concurrent operations might fail with a PermanentLockingException.
+atlas.graph.storage.lock.retries=10
+
+# Milliseconds to wait before evicting a cached entry. This should be > atlas.graph.storage.lock.wait-time
x atlas.graph.storage.lock.retries
+# If this is set to a low value (default is 10000), warnings on transactions taking too long
will occur in the Atlas application log.
+atlas.graph.storage.cache.db-cache-time=120000
+</verbatim>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8077741..86cff05 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-503 Lock exceptions occurring due to concurrent updates to backend stores (yhemanth)
 ATLAS-766 Atlas policy file does not honour standard hash as comment format ( saqeeb.s via
sumasai )
 ATLAS-843 Atlas UI: Feature to search terms in left navigation. (Kalyanikashikar via sumasai)
 ATLAS-731 Remove dashboard module in Atlas, replaced by dashboardv2 (kevalbhatt18 via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
index b4dc12e..fc8f2c4 100644
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.thinkaurelius.titan.core.attribute.Duration;
 import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
 import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
 import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
@@ -50,8 +51,6 @@ import java.util.*;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static com.thinkaurelius.titan.diskstorage.EntryMetaData.*;
-
 /**
  * Here are some areas that might need work:
  * <p/>
@@ -85,7 +84,9 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
 
     private LocalLockMediator<StoreTransaction> localLockMediator;
 
-    private Duration lockExpiryTime;
+    private final Duration lockExpiryTimeMs;
+    private final Duration lockMaxWaitTimeMs;
+    private final Integer lockMaxRetries;
 
     HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName,
String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) {
         this.storeManager = storeManager;
@@ -96,7 +97,10 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
         this.columnFamilyBytes = columnFamily.getBytes();
         this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
         this.localLockMediator = llm;
-        this.lockExpiryTime = storeManager.getStorageConfig().get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+        Configuration storageConfig = storeManager.getStorageConfig();
+        this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE);
+        this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT);
+        this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY);
     }
 
     @Override
@@ -128,14 +132,37 @@ public class HBaseKeyColumnValueStore implements KeyColumnValueStore
{
 
         KeyColumn lockID = new KeyColumn(key, column);
         logger.debug("Attempting to acquireLock on {} ", lockID);
-        final Timepoint lockStartTime = Timestamps.NANO.getTime(System.nanoTime(), TimeUnit.NANOSECONDS);
-        boolean locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTime));
-        if (!locked) {
-            throw new PermanentLockingException("Could not lock the keyColumn " + lockID
+  " on CF {} " + Bytes.toString(columnFamilyBytes));
+        int trialCount = 0;
+        boolean locked;
+        while (trialCount < lockMaxRetries) {
+            final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+            locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs));
+            trialCount++;
+            if (!locked) {
+                handleLockFailure(txh, lockID, trialCount);
+            } else {
+                logger.debug("Acquired lock on {}, {}", lockID, txh);
+                break;
+            }
         }
         ((HBaseTransaction) txh).updateLocks(lockID, expectedValue);
     }
 
+    void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws
PermanentLockingException {
+        if (trialCount < lockMaxRetries) {
+            try {
+                Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS));
+            } catch (InterruptedException e) {
+                throw new PermanentLockingException(
+                        "Interrupted while waiting for acquiring lock for transaction "
+                        + txh + " lockID " + lockID + " on retry " + trialCount, e);
+            }
+        } else {
+            throw new PermanentLockingException("Could not lock the keyColumn " +
+                    lockID + " on CF {} " + Bytes.toString(columnFamilyBytes));
+        }
+    }
+
     @Override
     public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException
{
         return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/83d05397/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java
----------------------------------------------------------------------
diff --git a/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java
b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java
new file mode 100644
index 0000000..7ed636a
--- /dev/null
+++ b/titan/src/test/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStoreTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.thinkaurelius.titan.diskstorage.hbase;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.EntryMetaData;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
+import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
+import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
+import com.thinkaurelius.titan.diskstorage.util.time.StandardDuration;
+import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.fail;
+
+public class HBaseKeyColumnValueStoreTest {
+
+    @Mock
+    HBaseStoreManager storeManager;
+
+    @Mock
+    ConnectionMask connectionMask;
+
+    @Mock
+    LocalLockMediator localLockMediator;
+
+    @Mock
+    StaticBuffer key;
+
+    @Mock
+    StaticBuffer column;
+
+    @Mock
+    StaticBuffer expectedValue;
+
+    @Mock
+    HBaseTransaction transaction;
+
+    @Mock
+    Configuration storageConfig;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void shouldSucceedInLockingIfLockMediatorSucceeds() throws BackendException {
+
+        when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
+        when(storeManager.getStorageConfig()).thenReturn(storageConfig);
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
+                new StandardDuration(300L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
+                new StandardDuration(10L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
+        KeyColumn lockID = new KeyColumn(key, column);
+        when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
+                thenReturn(true);
+
+        HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
+                new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e",
"hbase", localLockMediator);
+        hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
+
+        verify(transaction).updateLocks(lockID, expectedValue);
+        verify(localLockMediator, times(1)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
+    }
+
+    @Test
+    public void shouldRetryRightNumberOfTimesIfLockMediationFails() throws BackendException
{
+        when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
+        when(storeManager.getStorageConfig()).thenReturn(storageConfig);
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
+                new StandardDuration(300L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
+                new StandardDuration(10L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
+        KeyColumn lockID = new KeyColumn(key, column);
+        when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
+                thenReturn(false).thenReturn(false).thenReturn(true);
+
+        HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
+                new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e",
"hbase", localLockMediator);
+        hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
+
+        verify(transaction).updateLocks(lockID, expectedValue);
+        verify(localLockMediator, times(3)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
+    }
+
+    @Test(expectedExceptions = PermanentLockingException.class)
+    public void shouldThrowExceptionAfterConfiguredRetriesIfLockMediationFails() throws BackendException
{
+        when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
+        when(storeManager.getStorageConfig()).thenReturn(storageConfig);
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
+                new StandardDuration(300L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
+                new StandardDuration(10L, TimeUnit.MILLISECONDS));
+        when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
+        KeyColumn lockID = new KeyColumn(key, column);
+        when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
+                thenReturn(false).thenReturn(false).thenReturn(false);
+
+        HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
+                new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e",
"hbase", localLockMediator);
+        hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
+
+        fail("Should fail as lock could not be acquired after 3 retries.");
+    }
+}


Mime
View raw message