incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject svn commit: r1177960 - /incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Date Sat, 01 Oct 2011 10:59:49 GMT
Author: alexis
Date: Sat Oct  1 10:59:49 2011
New Revision: 1177960

URL: http://svn.apache.org/viewvc?rev=1177960&view=rev
Log:
ConcurrentModificationException while iterating over map keys

Modified:
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1177960&r1=1177959&r2=1177960&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Sat Oct  1 10:59:49 2011
@@ -20,9 +20,10 @@ package org.apache.gora.cassandra.store;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
@@ -61,8 +62,11 @@ public class CassandraStore<K, T extends
 
   /**
    * The values are Avro fields pending to be stored.
+   *
+   * We want to iterate over the keys in insertion order.
+   * We don't want to lock the entire collection before iterating over the keys, since in
the meantime other threads are adding entries to the map.
    */
-  private Map<K, T> buffer = new HashMap<K, T>();
+  private Map<K, T> buffer = new LinkedHashMap<K, T>();
   
   public CassandraStore() throws Exception {
     this.cassandraClient.init();
@@ -189,8 +193,19 @@ public class CassandraStore<K, T extends
    */
   @Override
   public void flush() throws IOException {
-    for (K key: this.buffer.keySet()) {
+    
+    Set<K> keys = this.buffer.keySet();
+    
+    // this duplicates memory footprint
+    K[] keyArray = (K[]) keys.toArray();
+    
+    // iterating over the key set directly would throw ConcurrentModificationException with
java.util.HashMap and subclasses
+    for (K key: keyArray) {
       T value = this.buffer.get(key);
+      if (value == null) {
+        LOG.info("Value to update is null for key " + key);
+        continue;
+      }
       Schema schema = value.getSchema();
       for (Field field: schema.getFields()) {
         if (value.isDirty(field.pos())) {
@@ -199,7 +214,10 @@ public class CassandraStore<K, T extends
       }
     }
     
-    this.buffer.clear();
+    // remove flushed rows
+    for (K key: keyArray) {
+      this.buffer.remove(key);
+    }
   }
 
   @Override
@@ -263,6 +281,7 @@ public class CassandraStore<K, T extends
       }
     }
     
+    // this performs a structural modification of the map
     this.buffer.put(key, p);
  }
 



Mime
View raw message