accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [44/59] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Sat, 07 Sep 2013 03:28:47 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
new file mode 100644
index 0000000..59142f7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.start.Platform;
+import org.apache.log4j.Logger;
+
+/**
+ * This class stores data in a C++ map. Doing this allows us to store more in memory and
avoid pauses caused by Java GC.
+ * 
+ * The strategy for dealing with native memory allocated for the native map is that java
code using the native map should call delete() as soon as it is
+ * finished using the native map. When the NativeMap object is garbage collected its native
resources will be released if needed. However waiting for java GC
+ * would be a mistake for long lived NativeMaps. Long lived objects are not garbage collected
quickly, therefore a process could easily use too much memory.
+ * 
+ */
+
+public class NativeMap implements Iterable<Map.Entry<Key,Value>> {
+  
+  private static final Logger log = Logger.getLogger(NativeMap.class);
+  
+  private long nmPointer;
+  
+  private final ReadWriteLock rwLock;
+  private final Lock rlock;
+  private final Lock wlock;
+  
+  int modCount = 0;
+  
+  private static native long createNM();
+  
+  // private static native void putNM(long nmPointer, byte[] kd, int cfo, int cqo, int cvo,
int tl, long ts, boolean del, byte[] value);
+  
+  private static native void singleUpdate(long nmPointer, byte[] row, byte cf[], byte cq[],
byte cv[], long ts, boolean del, byte[] value, int mutationCount);
+  
+  private static native long startUpdate(long nmPointer, byte[] row);
+  
+  private static native void update(long nmPointer, long updateID, byte cf[], byte cq[],
byte cv[], long ts, boolean del, byte[] value, int mutationCount);
+  
+  private static native int sizeNM(long nmPointer);
+  
+  private static native long memoryUsedNM(long nmPointer);
+  
+  private static native long deleteNM(long nmPointer);
+  
+  private static boolean init = false;
+  private static long totalAllocations;
+  private static HashSet<Long> allocatedNativeMaps;
+  
+  private static synchronized long createNativeMap() {
+    
+    if (!init) {
+      allocatedNativeMaps = new HashSet<Long>();
+      
+      Runnable r = new Runnable() {
+        @Override
+        public void run() {
+          if (allocatedNativeMaps.size() > 0) {
+            // print to system err in case log4j is shutdown...
+            try {
+              log.warn("There are " + allocatedNativeMaps.size() + " allocated native maps");
+            } catch (Throwable t) {
+              log.error("There are " + allocatedNativeMaps.size() + " allocated native maps");
+            }
+          }
+          
+          log.debug(totalAllocations + " native maps were allocated");
+        }
+      };
+      
+      Runtime.getRuntime().addShutdownHook(new Thread(r));
+      
+      init = true;
+    }
+    
+    long nmPtr = createNM();
+    
+    if (allocatedNativeMaps.contains(nmPtr)) {
+      // something is really screwy, this should not happen
+      throw new RuntimeException(String.format("Duplicate native map pointer 0x%016x ", nmPtr));
+    }
+    
+    totalAllocations++;
+    allocatedNativeMaps.add(nmPtr);
+    
+    return nmPtr;
+  }
+  
+  private static synchronized void deleteNativeMap(long nmPtr) {
+    if (allocatedNativeMaps.contains(nmPtr)) {
+      deleteNM(nmPtr);
+      allocatedNativeMaps.remove(nmPtr);
+    } else {
+      throw new RuntimeException(String.format("Attempt to delete native map that is not
allocated 0x%016x ", nmPtr));
+    }
+  }
+  
+  private static boolean loadedNativeLibraries = false;
+  
+  public static String getNativeLibPath() {
+    return "lib/native/map/" + System.mapLibraryName("NativeMap-" + Platform.getPlatform());
+  }
+  
+  public static void loadNativeLib(String nativeLib) {
+    try {
+      System.load(nativeLib);
+      log.info("Loaded native map shared library " + nativeLib);
+      loadedNativeLibraries = true;
+    } catch (Throwable t) {
+      log.error("Failed to load native map library " + nativeLib, t);
+    }
+  }
+  
+  static {
+    String aHome = System.getenv("ACCUMULO_HOME");
+    if (aHome != null) {
+      String nativeLib = aHome + "/" + getNativeLibPath();
+      loadNativeLib(new File(nativeLib).getAbsolutePath());
+    }
+  }
+  
+  public static boolean loadedNativeLibraries() {
+    return loadedNativeLibraries;
+  }
+  
+  private static native long createNMI(long nmp, int fieldLens[]);
+  
+  private static native long createNMI(long nmp, byte[] row, byte cf[], byte cq[], byte cv[],
long ts, boolean del, int fieldLens[]);
+  
+  private static native boolean nmiNext(long nmiPointer, int fieldLens[]);
+  
+  private static native void nmiGetData(long nmiPointer, byte[] row, byte cf[], byte cq[],
byte cv[], byte[] valData);
+  
+  private static native long nmiGetTS(long nmiPointer);
+  
+  private static native void deleteNMI(long nmiPointer);
+  
+  private class ConcurrentIterator implements Iterator<Map.Entry<Key,Value>>
{
+    
+    // in order to get good performance when there are multiple threads reading, need to
read a lot while the
+    // the read lock is held..... lots of threads trying to get the read lock very often
causes serious slow
+    // downs.... also reading a lot of entries at once lessens the impact of concurrent writes...
if only
+    // one entry were read at a time and there were concurrent writes, then iteration could
be n*log(n)
+    
+    // increasing this number has a positive effect on concurrent read performance, but negatively
effects
+    // concurrent writers
+    private static final int MAX_READ_AHEAD_ENTRIES = 16;
+    private static final int READ_AHEAD_BYTES = 4096;
+    
+    private NMIterator source;
+    
+    private Entry<Key,Value> nextEntries[];
+    private int index;
+    private int end;
+    
+    ConcurrentIterator() {
+      this(new MemKey());
+    }
+    
+    @SuppressWarnings("unchecked")
+    ConcurrentIterator(Key key) {
+      // start off with a small read ahead
+      nextEntries = new Entry[1];
+      
+      rlock.lock();
+      try {
+        source = new NMIterator(key);
+        fill();
+      } finally {
+        rlock.unlock();
+      }
+    }
+    
+    // it is assumed the read lock is held when this method is called
+    @SuppressWarnings("unchecked")
+    private void fill() {
+      end = 0;
+      index = 0;
+      
+      if (source.hasNext())
+        source.doNextPreCheck();
+      
+      int amountRead = 0;
+      
+      // as we keep filling, increase the read ahead buffer
+      if (nextEntries.length < MAX_READ_AHEAD_ENTRIES)
+        nextEntries = new Entry[Math.min(nextEntries.length * 2, MAX_READ_AHEAD_ENTRIES)];
+      
+      while (source.hasNext() && end < nextEntries.length) {
+        Entry<Key,Value> ne = source.next();
+        nextEntries[end++] = ne;
+        amountRead += ne.getKey().getSize() + ne.getValue().getSize();
+        
+        if (amountRead > READ_AHEAD_BYTES)
+          break;
+      }
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return end != 0;
+    }
+    
+    @Override
+    public Entry<Key,Value> next() {
+      if (end == 0) {
+        throw new NoSuchElementException();
+      }
+      
+      Entry<Key,Value> ret = nextEntries[index++];
+      
+      if (index == end) {
+        rlock.lock();
+        try {
+          fill();
+        } catch (ConcurrentModificationException cme) {
+          source.delete();
+          source = new NMIterator(ret.getKey());
+          fill();
+          if (0 < end && nextEntries[0].getKey().equals(ret.getKey())) {
+            index++;
+            if (index == end) {
+              fill();
+            }
+          }
+        } finally {
+          rlock.unlock();
+        }
+        
+      }
+      
+      return ret;
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+    public void delete() {
+      source.delete();
+    }
+  }
+  
+  private class NMIterator implements Iterator<Map.Entry<Key,Value>> {
+    
+    /**
+     * The strategy for dealing with native memory allocated for iterators is to simply delete
that memory when this Java Object is garbage collected.
+     * 
+     * These iterators are likely short lived object and therefore will be quickly garbage
collected. Even if the objects are long lived and therefore more
+     * slowly garbage collected they only hold a small amount of native memory.
+     * 
+     */
+    
+    private long nmiPointer;
+    private boolean hasNext;
+    private int expectedModCount;
+    private int[] fieldsLens = new int[7];
+    private byte lastRow[];
+    
+    // it is assumed the read lock is held when this method is called
+    NMIterator(Key key) {
+      
+      if (nmPointer == 0) {
+        throw new IllegalStateException();
+      }
+      
+      expectedModCount = modCount;
+      
+      nmiPointer = createNMI(nmPointer, key.getRowData().toArray(), key.getColumnFamilyData().toArray(),
key.getColumnQualifierData().toArray(), key
+          .getColumnVisibilityData().toArray(), key.getTimestamp(), key.isDeleted(), fieldsLens);
+      
+      hasNext = nmiPointer != 0;
+    }
+    
+    // delete is synchronized on a per iterator basis want to ensure only one
+    // thread deletes an iterator w/o acquiring the global write lock...
+    // there is no contention among concurrent readers for deleting their iterators
+    public synchronized void delete() {
+      if (nmiPointer == 0) {
+        return;
+      }
+      
+      // log.debug("Deleting native map iterator pointer");
+      
+      deleteNMI(nmiPointer);
+      nmiPointer = 0;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+    
+    // it is assumed the read lock is held when this method is called
+    // this method only needs to be called once per read lock acquisition
+    private void doNextPreCheck() {
+      if (nmPointer == 0) {
+        throw new IllegalStateException();
+      }
+      
+      if (modCount != expectedModCount) {
+        throw new ConcurrentModificationException();
+      }
+    }
+    
+    @Override
+    // It is assumed that this method is called w/ the read lock held and
+    // that doNextPreCheck() is called prior to calling this method
+    // also this method is synchronized to ensure that a deleted iterator
+    // is not used
+    public synchronized Entry<Key,Value> next() {
+      if (!hasNext) {
+        throw new NoSuchElementException();
+      }
+      
+      if (nmiPointer == 0) {
+        throw new IllegalStateException("Native Map Iterator Deleted");
+      }
+      
+      byte[] row = null;
+      if (fieldsLens[0] >= 0) {
+        row = new byte[fieldsLens[0]];
+        lastRow = row;
+      }
+      
+      byte cf[] = new byte[fieldsLens[1]];
+      byte cq[] = new byte[fieldsLens[2]];
+      byte cv[] = new byte[fieldsLens[3]];
+      boolean deleted = fieldsLens[4] == 0 ? false : true;
+      byte val[] = new byte[fieldsLens[5]];
+      
+      nmiGetData(nmiPointer, row, cf, cq, cv, val);
+      long ts = nmiGetTS(nmiPointer);
+      
+      Key k = new MemKey(lastRow, cf, cq, cv, ts, deleted, false, fieldsLens[6]);
+      Value v = new Value(val, false);
+      
+      hasNext = nmiNext(nmiPointer, fieldsLens);
+      
+      return new NMEntry(k, v);
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    protected void finalize() throws Throwable {
+      super.finalize();
+      if (nmiPointer != 0) {
+        // log.debug("Deleting native map iterator pointer in finalize");
+        deleteNMI(nmiPointer);
+      }
+    }
+    
+  }
+  
+  private static class NMEntry implements Map.Entry<Key,Value> {
+    
+    private Key key;
+    private Value val;
+    
+    NMEntry(Key k, Value v) {
+      this.key = k;
+      this.val = v;
+    }
+    
+    @Override
+    public Key getKey() {
+      return key;
+    }
+    
+    @Override
+    public Value getValue() {
+      return val;
+    }
+    
+    @Override
+    public Value setValue(Value value) {
+      throw new UnsupportedOperationException();
+    }
+    
+    public String toString() {
+      return key + "=" + val;
+    }
+  }
+  
+  public NativeMap() {
+    nmPointer = createNativeMap();
+    rwLock = new ReentrantReadWriteLock();
+    rlock = rwLock.readLock();
+    wlock = rwLock.writeLock();
+    log.debug(String.format("Allocated native map 0x%016x", nmPointer));
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    super.finalize();
+    if (nmPointer != 0) {
+      log.warn(String.format("Deallocating native map 0x%016x in finalize", nmPointer));
+      deleteNativeMap(nmPointer);
+    }
+  }
+  
+  private void _mutate(Mutation mutation, int mutationCount) {
+    
+    List<ColumnUpdate> updates = mutation.getUpdates();
+    if (updates.size() == 1) {
+      ColumnUpdate update = updates.get(0);
+      singleUpdate(nmPointer, mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(),
update.getColumnVisibility(), update.getTimestamp(),
+          update.isDeleted(), update.getValue(), mutationCount);
+    } else if (updates.size() > 1) {
+      long uid = startUpdate(nmPointer, mutation.getRow());
+      for (ColumnUpdate update : updates) {
+        update(nmPointer, uid, update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(),
update.getTimestamp(), update.isDeleted(),
+            update.getValue(), mutationCount);
+      }
+      
+    }
+  }
+  
+  public void mutate(Mutation mutation, int mutationCount) {
+    wlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      modCount++;
+      
+      _mutate(mutation, mutationCount);
+    } finally {
+      wlock.unlock();
+    }
+  }
+  
+  public void mutate(List<Mutation> mutations, int mutationCount) {
+    Iterator<Mutation> iter = mutations.iterator();
+    
+    while (iter.hasNext()) {
+      
+      wlock.lock();
+      try {
+        if (nmPointer == 0) {
+          throw new IllegalStateException("Native Map Deleted");
+        }
+        
+        modCount++;
+        
+        int count = 0;
+        while (iter.hasNext() && count < 10) {
+          Mutation mutation = iter.next();
+          _mutate(mutation, mutationCount);
+          mutationCount++;
+          count += mutation.size();
+        }
+      } finally {
+        wlock.unlock();
+      }
+    }
+  }
+  
+  public void put(Key key, Value value) {
+    wlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      modCount++;
+      
+      singleUpdate(nmPointer, key.getRowData().toArray(), key.getColumnFamilyData().toArray(),
key.getColumnQualifierData().toArray(), key
+          .getColumnVisibilityData().toArray(), key.getTimestamp(), key.isDeleted(), value.get(),
0);
+    } finally {
+      wlock.unlock();
+    }
+  }
+  
+  public Value get(Key key) {
+    rlock.lock();
+    try {
+      Value ret = null;
+      NMIterator nmi = new NMIterator(key);
+      if (nmi.hasNext()) {
+        Entry<Key,Value> entry = nmi.next();
+        if (entry.getKey().equals(key)) {
+          ret = entry.getValue();
+        }
+      }
+      
+      nmi.delete();
+      
+      return ret;
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  public int size() {
+    rlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      return sizeNM(nmPointer);
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  public long getMemoryUsed() {
+    rlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      return memoryUsedNM(nmPointer);
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  public Iterator<Map.Entry<Key,Value>> iterator() {
+    rlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      return new ConcurrentIterator();
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  public Iterator<Map.Entry<Key,Value>> iterator(Key startKey) {
+    rlock.lock();
+    try {
+      
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      return new ConcurrentIterator(startKey);
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  public void delete() {
+    wlock.lock();
+    try {
+      if (nmPointer == 0) {
+        throw new IllegalStateException("Native Map Deleted");
+      }
+      
+      log.debug(String.format("Deallocating native map 0x%016x", nmPointer));
+      deleteNativeMap(nmPointer);
+      nmPointer = 0;
+    } finally {
+      wlock.unlock();
+    }
+  }
+  
+  private static class NMSKVIter implements InterruptibleIterator {
+    
+    private ConcurrentIterator iter;
+    private Entry<Key,Value> entry;
+    
+    private NativeMap map;
+    private Range range;
+    private AtomicBoolean interruptFlag;
+    private int interruptCheckCount = 0;
+    
+    private NMSKVIter(NativeMap map, AtomicBoolean interruptFlag) {
+      this.map = map;
+      this.range = new Range();
+      iter = map.new ConcurrentIterator();
+      if (iter.hasNext())
+        entry = iter.next();
+      else
+        entry = null;
+      
+      this.interruptFlag = interruptFlag;
+    }
+    
+    public NMSKVIter(NativeMap map) {
+      this(map, null);
+    }
+    
+    @Override
+    public Key getTopKey() {
+      return entry.getKey();
+    }
+    
+    @Override
+    public Value getTopValue() {
+      return entry.getValue();
+    }
+    
+    @Override
+    public boolean hasTop() {
+      return entry != null;
+    }
+    
+    @Override
+    public void next() throws IOException {
+      
+      if (entry == null)
+        throw new IllegalStateException();
+      
+      // checking the interrupt flag for every call to next had bad a bad performance impact
+      // so check it every 100th time
+      if (interruptFlag != null && interruptCheckCount++ % 100 == 0 && interruptFlag.get())
+        throw new IterationInterruptedException();
+      
+      if (iter.hasNext()) {
+        entry = iter.next();
+        if (range.afterEndKey(entry.getKey())) {
+          entry = null;
+        }
+      } else
+        entry = null;
+      
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean
inclusive) throws IOException {
+      
+      if (interruptFlag != null && interruptFlag.get())
+        throw new IterationInterruptedException();
+      
+      iter.delete();
+      
+      this.range = range;
+      
+      Key key = range.getStartKey();
+      if (key == null) {
+        key = new MemKey();
+      }
+      
+      iter = map.new ConcurrentIterator(key);
+      if (iter.hasNext()) {
+        entry = iter.next();
+        if (range.afterEndKey(entry.getKey())) {
+          entry = null;
+        }
+      } else
+        entry = null;
+      
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      return new NMSKVIter(map, interruptFlag);
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      this.interruptFlag = flag;
+    }
+  }
+  
+  public SortedKeyValueIterator<Key,Value> skvIterator() {
+    return new NMSKVIter(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
new file mode 100644
index 0000000..b0ed9ee
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Rate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public class Rate {
+  private long lastCounter = -1;
+  private long lastTime = -1;
+  private double current = 0.0;
+  final double ratio;
+  
+  /**
+   * Turn a counter into an exponentially smoothed rate over time.
+   * 
+   * @param ratio
+   *          the rate at which each update influences the curve; must be (0., 1.0)
+   */
+  public Rate(double ratio) {
+    if (ratio <= 0. || ratio >= 1.0)
+      throw new IllegalArgumentException("ratio must be > 0. and < 1.0");
+    this.ratio = ratio;
+  }
+  
+  public double update(long counter) {
+    return update(System.currentTimeMillis(), counter);
+  }
+  
+  synchronized public double update(long when, long counter) {
+    if (lastCounter < 0) {
+      lastTime = when;
+      lastCounter = counter;
+      return current;
+    }
+    if (lastTime == when) {
+      throw new IllegalArgumentException("update time < last value");
+    }
+    double keep = 1. - ratio;
+    current = (keep * current + ratio * ((counter - lastCounter)) * 1000. / (when - lastTime));
+    lastTime = when;
+    lastCounter = counter;
+    return current;
+  }
+  
+  synchronized public double rate() {
+    return this.current;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
new file mode 100644
index 0000000..1b22f05
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.tserver.ConditionalMutationSet.DeferFilter;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+
+/**
+ * 
+ */
+class RowLocks {
+  
+  private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
+  
+  static class RowLock {
+    ReentrantLock rlock;
+    int count;
+    ByteSequence rowSeq;
+    
+    RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
+      this.rlock = rlock;
+      this.count = 0;
+      this.rowSeq = rowSeq;
+    }
+    
+    public boolean tryLock() {
+      return rlock.tryLock();
+    }
+    
+    public void lock() {
+      rlock.lock();
+    }
+    
+    public void unlock() {
+      rlock.unlock();
+    }
+  }
+  
+  private RowLock getRowLock(ArrayByteSequence rowSeq) {
+      RowLock lock = rowLocks.get(rowSeq);
+      if (lock == null) {
+        lock = new RowLock(new ReentrantLock(), rowSeq);
+        rowLocks.put(rowSeq, lock);
+      }
+      
+      lock.count++;
+      return lock;
+  }
+  
+  private void returnRowLock(RowLock lock) {
+      if (lock.count == 0)
+        throw new IllegalStateException();
+      lock.count--;
+      
+      if (lock.count == 0) {
+        rowLocks.remove(lock.rowSeq);
+      }
+  }
+  
+  List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>>
updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    ArrayList<RowLock> locks = new ArrayList<RowLock>();
+    
+    // assume that mutations are in sorted order to avoid deadlock
+    synchronized (rowLocks) {
+      for (List<ServerConditionalMutation> scml : updates.values()) {
+        for (ServerConditionalMutation scm : scml) {
+          locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
+        }
+      }
+    }
+    
+    HashSet<ByteSequence> rowsNotLocked = null;
+
+    // acquire as many locks as possible, not blocking on rows that are already locked
+    if (locks.size() > 1) {
+      for (RowLock rowLock : locks) {
+        if (!rowLock.tryLock()) {
+          if (rowsNotLocked == null)
+            rowsNotLocked = new HashSet<ByteSequence>();
+          rowsNotLocked.add(rowLock.rowSeq);
+        }
+      }
+    } else {
+      // if there is only one lock, then wait for it
+      locks.get(0).lock();
+    }
+    
+    if (rowsNotLocked != null) {
+      
+      final HashSet<ByteSequence> rnlf = rowsNotLocked;
+      // assume will get locks needed, do something expensive otherwise
+      ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
+        @Override
+        public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation>
okMutations, List<ServerConditionalMutation> deferred) {
+          for (ServerConditionalMutation scm : scml) {
+            if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
+              deferred.add(scm);
+            else
+              okMutations.add(scm);
+            
+          }
+        }
+      });
+      
+      ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
+      ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
+      for (RowLock rowLock : locks) {
+        if (rowsNotLocked.contains(rowLock.rowSeq)) {
+          locksToReturn.add(rowLock);
+        } else {
+          filteredLocks.add(rowLock);
+        }
+      }
+      
+      synchronized (rowLocks) {
+        for (RowLock rowLock : locksToReturn) {
+          returnRowLock(rowLock);
+        }
+      }
+
+      locks = filteredLocks;
+    }
+    return locks;
+  }
+  
+  void releaseRowLocks(List<RowLock> locks) {
+    for (RowLock rowLock : locks) {
+      rowLock.unlock();
+    }
+    
+    synchronized (rowLocks) {
+      for (RowLock rowLock : locks) {
+        returnRowLock(rowLock);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
new file mode 100644
index 0000000..8bdf08b
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import org.apache.log4j.Level;
+
+public class TLevel extends Level {
+  
+  private static final long serialVersionUID = 1L;
+  public final static Level TABLET_HIST = new TLevel();
+  
+  protected TLevel() {
+    super(Level.DEBUG_INT + 100, "TABLET_HIST", Level.DEBUG_INT + 100);
+  }
+  
+  static public Level toLevel(int val) {
+    if (val == Level.DEBUG_INT + 100)
+      return Level.DEBUG;
+    return Level.toLevel(val);
+  }
+  
+}


Mime
View raw message