ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [45/50] incubator-ignite git commit: ignite-1007 Race in data structures processor
Date Mon, 15 Jun 2015 07:32:38 GMT
ignite-1007 Race in data structures processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/811872ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/811872ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/811872ce

Branch: refs/heads/ignite-1009-v4
Commit: 811872ce692666fe8c77235f175b7ec15f717d30
Parents: 5160088
Author: agura <agura@gridgain.com>
Authored: Fri Jun 12 18:36:58 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Fri Jun 12 18:36:58 2015 +0300

----------------------------------------------------------------------
 .../datastructures/DataStructuresProcessor.java | 67 +++++++++++++++++++-
 1 file changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/811872ce/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index aa3bfe2..473a2ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -67,6 +67,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     /** */
     private static final long RETRY_DELAY = 1;
 
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Initialization failed flag. */
+    private boolean initFailed;
+
     /** Cache contains only {@code GridCacheInternal,GridCacheInternal}. */
     private IgniteInternalCache<GridCacheInternal, GridCacheInternal> dsView;
 
@@ -145,6 +151,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
 
             dsCacheCtx = atomicsCache.context();
         }
+
+        initLatch.countDown();
     }
 
     /**
@@ -167,6 +175,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     @Override public void onKernalStop(boolean cancel) {
         super.onKernalStop(cancel);
 
+        if (initLatch.getCount() > 0) {
+            initFailed = true;
+            
+            initLatch.countDown();
+        }
+
         if (qryId != null)
             dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
     }
@@ -187,6 +201,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         checkAtomicsConfiguration();
 
         startQuery();
@@ -277,6 +293,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     public final void removeSequence(final String name) throws IgniteCheckedException {
         assert name != null;
 
+        awaitInitialization();
+
         checkAtomicsConfiguration();
 
         removeDataStructure(new IgniteCallable<Void>() {
@@ -315,6 +333,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         final boolean create) throws IgniteCheckedException {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         checkAtomicsConfiguration();
 
         startQuery();
@@ -431,6 +451,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert dsCacheCtx != null;
 
+        awaitInitialization();
+
         removeDataStructure(new IgniteCallable<Void>() {
             @Override public Void call() throws Exception {
                 dsCacheCtx.gate().enter();
@@ -520,6 +542,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         checkAtomicsConfiguration();
 
         startQuery();
@@ -585,6 +609,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert dsCacheCtx != null;
 
+        awaitInitialization();
+
         removeDataStructure(new IgniteCallable<Void>() {
             @Override public Void call() throws Exception {
                 dsCacheCtx.gate().enter();
@@ -623,6 +649,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         final S initStamp, final boolean create) throws IgniteCheckedException {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         checkAtomicsConfiguration();
 
         startQuery();
@@ -688,6 +716,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert dsCacheCtx != null;
 
+        awaitInitialization();
+
         removeDataStructure(new IgniteCallable<Void>() {
             @Override public Void call() throws Exception {
                 dsCacheCtx.gate().enter();
@@ -725,6 +755,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         throws IgniteCheckedException {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         String cacheName = null;
 
         if (cfg != null) {
@@ -801,6 +833,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert cctx != null;
 
+        awaitInitialization();
+
         IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>()
{
             @Override public GridCacheQueueHeader call() throws Exception {
                 return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name));
@@ -837,6 +871,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         boolean create)
         throws IgniteCheckedException
     {
+        awaitInitialization();
+
         Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
 
         if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
@@ -887,6 +923,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     }
 
     /**
+     * Awaits for processor initialization.
+     */
+    private void awaitInitialization() {
+        if (initLatch.getCount() > 0) {
+            try {
+                U.await(initLatch);
+
+                if (initFailed)
+                    throw new IllegalStateException("Failed to initialize data structures
processor.");
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IllegalStateException("Failed to initialize data structures processor
" +
+                    "(thread has been interrupted).", e);
+            }
+        }
+    }
+
+    /**
      * @param dsMap Map with data structure information.
      * @param info New data structure information.
      * @param create Create flag.
@@ -930,6 +984,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         if (create)
             A.ensure(cnt >= 0, "count can not be negative");
 
@@ -997,9 +1053,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert dsCacheCtx != null;
 
+        awaitInitialization();
+
         removeDataStructure(new IgniteCallable<Void>() {
-            @Override
-            public Void call() throws Exception {
+            @Override public Void call() throws Exception {
                 GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
                 dsCacheCtx.gate().enter();
@@ -1169,6 +1226,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         throws IgniteCheckedException {
         A.notNull(name, "name");
 
+        awaitInitialization();
+
         String cacheName = null;
 
         if (cfg != null)
@@ -1196,6 +1255,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
         assert name != null;
         assert cctx != null;
 
+        awaitInitialization();
+
         IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>()
{
             @Override public GridCacheSetHeader call() throws Exception {
                 return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name));
@@ -1326,7 +1387,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter
{
     /**
      *
      */
-    static enum DataStructureType {
+    enum DataStructureType {
         /** */
         ATOMIC_LONG(IgniteAtomicLong.class.getSimpleName()),
 


Mime
View raw message