accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1438312 [2/5] - in /accumulo/branches/ACCUMULO-259: ./ assemble/ core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/mapred/...
Date Fri, 25 Jan 2013 03:56:16 GMT
Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/admin/TableOperationsHelperTest.java
Fri Jan 25 03:56:14 2013
@@ -243,18 +243,18 @@ public class TableOperationsHelperTest {
     try {
       t.attachIterator("table", setting);
       Assert.fail();
-    } catch (IllegalArgumentException e) {}
+    } catch (AccumuloException e) {}
     setting.setName("thirdName");
     try {
       t.attachIterator("table", setting);
       Assert.fail();
-    } catch (IllegalArgumentException e) {}
+    } catch (AccumuloException e) {}
     setting.setPriority(10);
     t.setProperty("table", "table.iterator.minc.thirdName.opt.key", "value");
     try {
       t.attachIterator("table", setting);
       Assert.fail();
-    } catch (IllegalArgumentException e) {}
+    } catch (AccumuloException e) {}
     t.removeProperty("table", "table.iterator.minc.thirdName.opt.key");
     t.attachIterator("table", setting);
   }

Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
Fri Jan 25 03:56:14 2013
@@ -91,7 +91,7 @@ public class MockTableOperationsTest {
                 conn.tableOperations().attachIterator(t, settings);
                 Assert.fail();
             }
-            catch (IllegalArgumentException ex) {}
+            catch (AccumuloException ex) {}
             
             writeVersionable(conn, t, 3);
             assertVersionable(conn, t, 1);

Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/AggregatingIteratorTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/AggregatingIteratorTest.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/AggregatingIteratorTest.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/AggregatingIteratorTest.java
Fri Jan 25 03:56:14 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.Text;
 /**
  * @deprecated since 1.4
  */
+@Deprecated
 public class AggregatingIteratorTest extends TestCase {
   
   private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();

Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/NumSummationTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/NumSummationTest.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/NumSummationTest.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/NumSummationTest.java
Fri Jan 25 03:56:14 2013
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.data.Val
 /**
  * @deprecated since 1.4
  */
+@Deprecated
 public class NumSummationTest extends TestCase {
   public byte[] init(int n) {
     byte[] b = new byte[n];

Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorConfigurationTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorConfigurationTest.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorConfigurationTest.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorConfigurationTest.java
Fri Jan 25 03:56:14 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.Text;
 /**
  * @deprecated since 1.4
  */
+@Deprecated
 public class AggregatorConfigurationTest extends TestCase {
   public void testBinary() {
     Text colf = new Text();

Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1437608-1438309

Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1437608-1438309

Propchange: accumulo/branches/ACCUMULO-259/packages/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/packages:r1437608-1438309

Modified: accumulo/branches/ACCUMULO-259/proxy/examples/python/TestClient.py
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/examples/python/TestClient.py?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/examples/python/TestClient.py (original)
+++ accumulo/branches/ACCUMULO-259/proxy/examples/python/TestClient.py Fri Jan 25 03:56:14
2013
@@ -32,14 +32,15 @@ transport.open()
 
 userpass = UserPass("root","secret")
 
-print client.tableOperations_list(userpass)
+print client.listTables(userpass)
 
 testtable = "pythontest"
-if not client.tableOperations_exists(userpass,testtable):
-    client.tableOperations_create(userpass,testtable)
+if not client.tableExists(userpass, testtable):
+    client.createTable(userpass, testtable, True, TimeType.MILLIS)
 
-row1 = {'a':[PColumnUpdate('a','a',value='value1'), PColumnUpdate('b','b',value='value2')]}
-client.updateAndFlush(userpass,testtable,row1)
+row1 = {'a':[ColumnUpdate('a','a',value='value1'), ColumnUpdate('b','b',value='value2')]}
+client.updateAndFlush(userpass, testtable, row1)
 
-cookie = client.createBatchScanner(userpass,testtable,"",None,None)
-print client.scanner_next_k(cookie,10)
+cookie = client.createScanner(userpass, testtable, None)
+for entry in client.nextK(cookie, 10).results:
+   print entry

Modified: accumulo/branches/ACCUMULO-259/proxy/examples/ruby/test_client.rb
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/examples/ruby/test_client.rb?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/examples/ruby/test_client.rb (original)
+++ accumulo/branches/ACCUMULO-259/proxy/examples/ruby/test_client.rb Fri Jan 25 03:56:14
2013
@@ -33,17 +33,17 @@ transport.open()
 puts "Server is up? #{proxy.ping(us)}"
 
 # print out a table list
-puts "List of tables: #{proxy.tableOperations_list(us).inspect}"
+puts "List of tables: #{proxy.listTables(us).inspect}"
 
 testtable = "rubytest"
-proxy.tableOperations_create(us,testtable) unless proxy.tableOperations_exists(us,testtable)

+proxy.createTable(us,testtable) unless proxy.tableExists(us,testtable) 
 
-key1 = PColumnUpdate.new({'colFamily' => "cf1", 'colQualifier' => "cq1", 'value'=>
"a"})
-key2 = PColumnUpdate.new({'colFamily' => "cf2", 'colQualifier' => "cq2", 'value'=>
"b"})
-proxy.updateAndFlush(us,testtable,{'row1' => [key1,key2]},nil)
+update1 = PColumnUpdate.new({'colFamily' => "cf1", 'colQualifier' => "cq1", 'value'=>
"a"})
+update2 = PColumnUpdate.new({'colFamily' => "cf2", 'colQualifier' => "cq2", 'value'=>
"b"})
+proxy.updateAndFlush(us,testtable,{'row1' => [update1,update2]},nil)
 
-cookie = proxy.createBatchScanner(us,testtable,{},nil,nil)
-result = proxy.scanner_next_k(cookie,10)
+cookie = proxy.createScanner(us,testtable,nil)
+result = proxy.nextK(cookie,10)
 result.results.each{ |keyvalue| puts "Key: #{keyvalue.key.inspect} Value: #{keyvalue.value}"
}
 
 transport.close()

Modified: accumulo/branches/ACCUMULO-259/proxy/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/pom.xml?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/pom.xml (original)
+++ accumulo/branches/ACCUMULO-259/proxy/pom.xml Fri Jan 25 03:56:14 2013
@@ -101,6 +101,11 @@
     	<version>1.5.0-SNAPSHOT</version>
     	<scope>test</scope>
     </dependency>
+    <dependency>
+    	<groupId>org.apache.accumulo</groupId>
+    	<artifactId>examples-simple</artifactId>
+    	<scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

Modified: accumulo/branches/ACCUMULO-259/proxy/proxy.properties
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/proxy.properties?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/proxy.properties (original)
+++ accumulo/branches/ACCUMULO-259/proxy/proxy.properties Fri Jan 25 03:56:14 2013
@@ -1,8 +1,8 @@
-accumulo.proxy.apis=org.apache.accumulo.proxy.api.AccumuloProxy
+accumulo.proxy.apis=org.apache.accumulo.proxy.thrift.AccumuloProxy
 
 org.apache.accumulo.proxy.ProxyServer.useMockInstance=false
-org.apache.accumulo.proxy.api.AccumuloProxy.implementor=org.apache.accumulo.proxy.ProxyServer
-org.apache.accumulo.proxy.api.AccumuloProxy.port=42424
+org.apache.accumulo.proxy.thrift.AccumuloProxy.implementor=org.apache.accumulo.proxy.ProxyServer
+org.apache.accumulo.proxy.thrift.AccumuloProxy.port=42424
 
 org.apache.accumulo.proxy.ProxyServer.instancename=test
 org.apache.accumulo.proxy.ProxyServer.zookeepers=localhost:2181

Modified: accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
(original)
+++ accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
Fri Jan 25 03:56:14 2013
@@ -67,33 +67,65 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
+import org.apache.accumulo.proxy.thrift.BatchScanOptions;
 import org.apache.accumulo.proxy.thrift.KeyValueAndPeek;
 import org.apache.accumulo.proxy.thrift.ColumnUpdate;
 import org.apache.accumulo.proxy.thrift.CompactionReason;
 import org.apache.accumulo.proxy.thrift.CompactionType;
 import org.apache.accumulo.proxy.thrift.KeyValue;
 import org.apache.accumulo.proxy.thrift.NoMoreEntriesException;
+import org.apache.accumulo.proxy.thrift.ScanColumn;
+import org.apache.accumulo.proxy.thrift.ScanOptions;
 import org.apache.accumulo.proxy.thrift.ScanResult;
 import org.apache.accumulo.proxy.thrift.ScanState;
 import org.apache.accumulo.proxy.thrift.ScanType;
 import org.apache.accumulo.proxy.thrift.UnknownScanner;
 import org.apache.accumulo.proxy.thrift.UnknownWriter;
 import org.apache.accumulo.proxy.thrift.UserPass;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 
 public class ProxyServer implements AccumuloProxy.Iface {
   
+  public static final Logger logger = Logger.getLogger(ProxyServer.class);
   protected Instance instance;
   
-  protected class ScannerPlusIterator {
+  static protected class ScannerPlusIterator {
     public ScannerBase scanner;
     public Iterator<Map.Entry<Key,Value>> iterator;
   }
   
+  static class CloseWriter implements RemovalListener<UUID,BatchWriter> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,BatchWriter> notification) {
+      try {
+        notification.getValue().close();
+      } catch (MutationsRejectedException e) {
+        logger.warn(e, e);
+      }
+    }
+    public CloseWriter() {}
+  }
+  
+  static class CloseScanner implements RemovalListener<UUID,ScannerPlusIterator> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,ScannerPlusIterator> notification)
{
+      final ScannerBase base = notification.getValue().scanner;
+      if (base instanceof BatchScanner) {
+        final BatchScanner scanner = (BatchScanner)base;
+        scanner.close();
+      }
+    }
+    public CloseScanner() {}
+  }
+
   protected Cache<UUID,ScannerPlusIterator> scannerCache;
   protected Cache<UUID,BatchWriter> writerCache;
   
@@ -105,9 +137,9 @@ public class ProxyServer implements Accu
       instance = new ZooKeeperInstance(props.getProperty("org.apache.accumulo.proxy.ProxyServer.instancename"),
           props.getProperty("org.apache.accumulo.proxy.ProxyServer.zookeepers"));
     
-    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).build();
+    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new
CloseScanner()).build();
     
-    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).build();
+    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).maximumSize(1000).removalListener(new
CloseWriter()).build();
   }
   
   protected Connector getConnector(UserPass userpass) throws Exception {
@@ -197,6 +229,9 @@ public class ProxyServer implements Accu
   @Override
   public void createTable(UserPass userpass, String tableName, boolean versioningIter, org.apache.accumulo.proxy.thrift.TimeType
timeType) throws TException {
     try {
+      if (timeType == null)
+        timeType = org.apache.accumulo.proxy.thrift.TimeType.MILLIS;
+      
       getConnector(userpass).tableOperations().create(tableName, versioningIter, TimeType.valueOf(timeType.toString()));
     } catch (Exception e) {
       throw translateException(e);
@@ -257,13 +292,19 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public ByteBuffer getMaxRow(UserPass userpass, String tableName, List<ByteBuffer>
auths, ByteBuffer startRow, boolean startinclusive,
+  public ByteBuffer getMaxRow(UserPass userpass, String tableName, Set<ByteBuffer>
auths, ByteBuffer startRow, boolean startinclusive,
       ByteBuffer endRow, boolean endinclusive) throws TException {
     try {
-      
+      Connector connector = getConnector(userpass);
       Text startText = ByteBufferUtil.toText(startRow);
       Text endText = ByteBufferUtil.toText(endRow);
-      Text max = getConnector(userpass).tableOperations().getMaxRow(tableName, new Authorizations(auths),
startText, startinclusive, endText, endinclusive);
+      Authorizations auth;
+      if (auths != null) {
+        auth = getAuthorizations(auths);
+      } else {
+        auth = connector.securityOperations().getUserAuthorizations(userpass.getUsername());
+      }
+      Text max = connector.tableOperations().getMaxRow(tableName, auth, startText, startinclusive,
endText, endinclusive);
       return TextUtil.getByteBuffer(max);
     } catch (Exception e) {
       throw translateException(e);
@@ -675,31 +716,44 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public String createScanner(UserPass userpass, String tableName, Set<ByteBuffer>
authorizations, List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, org.apache.accumulo.proxy.thrift.Range
prange)
+  public String createScanner(UserPass userpass, String tableName, ScanOptions opts)
       throws TException {
     try {
       Connector connector = getConnector(userpass);
       
       Authorizations auth;
-      if (authorizations != null) {
-        auth = getAuthorizations(authorizations);
+      if (opts != null && opts.isSetAuthorizations()) {
+        auth = getAuthorizations(opts.authorizations);
       } else {
         auth = connector.securityOperations().getUserAuthorizations(userpass.getUsername());
       }
-      
       Scanner scanner = connector.createScanner(tableName, auth);
       
-      if (iterators != null) {
-        for (org.apache.accumulo.proxy.thrift.IteratorSetting iter : iterators) {
-          IteratorSetting is = new IteratorSetting(iter.getPriority(), iter.getName(), iter.getIteratorClass(),
iter.getProperties());
-          scanner.addScanIterator(is);
+      if (opts != null) {
+        if (opts.iterators != null) {
+          for (org.apache.accumulo.proxy.thrift.IteratorSetting iter : opts.iterators) {
+            IteratorSetting is = new IteratorSetting(iter.getPriority(), iter.getName(),
iter.getIteratorClass(), iter.getProperties());
+            scanner.addScanIterator(is);
+          }
+        }
+        org.apache.accumulo.proxy.thrift.Range prange = opts.range;
+        if (prange != null) {
+          Range range = new Range(
+              Util.fromThrift(prange.getStart()), prange.startInclusive, 
+              Util.fromThrift(prange.getStop()), prange.stopInclusive
+                  );
+          scanner.setRange(range);
+        }
+        if (opts.columns != null) {
+          for (ScanColumn col : opts.columns) {
+            if (col.isSetColQualifier())
+              scanner.fetchColumn(ByteBufferUtil.toText(col.colFamily), ByteBufferUtil.toText(col.colQualifier));
+            else
+              scanner.fetchColumnFamily(ByteBufferUtil.toText(col.colFamily));
+          }
         }
       }
       
-      Range range = prange == null ? new Range() : (new Range(prange.getStart() == null ?
null : Util.fromThrift(prange.getStart()), true,
-          prange.getStop() == null ? null : Util.fromThrift(prange.getStop()), false));
-      
-      scanner.setRange(range);
       UUID uuid = UUID.randomUUID();
       
       ScannerPlusIterator spi = new ScannerPlusIterator();
@@ -713,39 +767,44 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public String createBatchScanner(UserPass userpass, String tableName, Set<ByteBuffer>
authorizations, List<org.apache.accumulo.proxy.thrift.IteratorSetting> iterators, List<org.apache.accumulo.proxy.thrift.Range>
pranges)
+  public String createBatchScanner(UserPass userpass, String tableName, List<org.apache.accumulo.proxy.thrift.Range>
pranges, BatchScanOptions opts)
       throws TException {
     try {
       Connector connector = getConnector(userpass);
       
+      int threads = 10;
       Authorizations auth;
-      if (authorizations != null) {
-        auth = getAuthorizations(authorizations);
+      if (opts != null && opts.isSetAuthorizations()) {
+        auth = getAuthorizations(opts.authorizations);
       } else {
         auth = connector.securityOperations().getUserAuthorizations(userpass.getUsername());
       }
+      if (opts != null && opts.threads > 0)
+        threads = opts.threads;
+
+      BatchScanner scanner = connector.createBatchScanner(tableName, auth, threads);
       
-      BatchScanner scanner = connector.createBatchScanner(tableName, auth, 10);
-      
-      if (iterators != null) {
-        for (org.apache.accumulo.proxy.thrift.IteratorSetting iter : iterators) {
-          IteratorSetting is = new IteratorSetting(iter.getPriority(), iter.getName(), iter.getIteratorClass(),
iter.getProperties());
-          scanner.addScanIterator(is);
+      if (opts != null) {
+        if (opts.iterators != null) {
+          for (org.apache.accumulo.proxy.thrift.IteratorSetting iter : opts.iterators) {
+            IteratorSetting is = new IteratorSetting(iter.getPriority(), iter.getName(),
iter.getIteratorClass(), iter.getProperties());
+            scanner.addScanIterator(is);
+          }
         }
-      }
-      
-      ArrayList<Range> ranges = new ArrayList<Range>();
-      
-      if (pranges == null) {
-        ranges.add(new Range());
-      } else {
-        for (org.apache.accumulo.proxy.thrift.Range range : pranges) {
-          Range aRange = new Range(range.getStart() == null ? null : Util.fromThrift(range.getStart()),
true, range.getStop() == null ? null
-              : Util.fromThrift(range.getStop()), false);
-          ranges.add(aRange);
+        
+        ArrayList<Range> ranges = new ArrayList<Range>();
+        
+        if (pranges == null) {
+          ranges.add(new Range());
+        } else {
+          for (org.apache.accumulo.proxy.thrift.Range range : pranges) {
+            Range aRange = new Range(range.getStart() == null ? null : Util.fromThrift(range.getStart()),
true, range.getStop() == null ? null
+                : Util.fromThrift(range.getStop()), false);
+            ranges.add(aRange);
+          }
         }
+        scanner.setRanges(ranges);
       }
-      scanner.setRanges(ranges);
       UUID uuid = UUID.randomUUID();
       
       ScannerPlusIterator spi = new ScannerPlusIterator();
@@ -759,7 +818,7 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public boolean scanner_hasnext(String scanner) throws TException {
+  public boolean hasNext(String scanner) throws TException {
     ScannerPlusIterator spi = scannerCache.getIfPresent(UUID.fromString(scanner));
     if (spi == null) {
       throw new TException("Scanner never existed or no longer exists");
@@ -769,9 +828,9 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public KeyValueAndPeek scanner_next(String scanner) throws TException {
+  public KeyValueAndPeek nextEntry(String scanner) throws TException {
     
-    ScanResult scanResult = scanner_next_k(scanner, 1);
+    ScanResult scanResult = nextK(scanner, 1);
     if (scanResult.results.size() > 0) {
       return new KeyValueAndPeek(scanResult.results.get(0), scanResult.isMore());
     } else {
@@ -781,7 +840,7 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public ScanResult scanner_next_k(String scanner, int k) throws TException {
+  public ScanResult nextK(String scanner, int k) throws TException {
     
     // fetch the scanner
     ScannerPlusIterator spi = scannerCache.getIfPresent(UUID.fromString(scanner));
@@ -802,7 +861,7 @@ public class ProxyServer implements Accu
         }
         ret.setMore(numRead == k);
       } catch (Exception ex) {
-        close_scanner(scanner);
+        closeScanner(scanner);
         throw translateException(ex);
       }
       return ret;
@@ -810,14 +869,14 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public void close_scanner(String uuid) throws TException {
+  public void closeScanner(String uuid) throws TException {
     scannerCache.invalidate(uuid);
   }
   
   @Override
   public void updateAndFlush(UserPass userpass, String tableName, Map<ByteBuffer,List<ColumnUpdate>>
cells) throws TException {
     try {
-      BatchWriter writer = getWriter(userpass, tableName);
+      BatchWriter writer = getWriter(userpass, tableName, null);
       addCellsToWriter(cells, writer);
       writer.flush();
       writer.close();
@@ -865,9 +924,9 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public String createWriter(UserPass userpass, String tableName) throws TException {
+  public String createWriter(UserPass userpass, String tableName, WriterOptions opts) throws
TException {
     try {
-      BatchWriter writer = getWriter(userpass, tableName);
+      BatchWriter writer = getWriter(userpass, tableName, opts);
       UUID uuid = UUID.randomUUID();
       writerCache.put(uuid, writer);
       return uuid.toString();
@@ -877,7 +936,7 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public void writer_update(String writer, Map<ByteBuffer,List<ColumnUpdate>>
cells) throws TException {
+  public void update(String writer, Map<ByteBuffer,List<ColumnUpdate>> cells)
throws TException {
     try {
       BatchWriter batchwriter = writerCache.getIfPresent(UUID.fromString(writer));
       if (batchwriter == null) {
@@ -890,7 +949,7 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public void writer_flush(String writer) throws TException {
+  public void flush(String writer) throws TException {
     try {
       BatchWriter batchwriter = writerCache.getIfPresent(UUID.fromString(writer));
       if (batchwriter == null) {
@@ -903,7 +962,7 @@ public class ProxyServer implements Accu
   }
   
   @Override
-  public void writer_close(String writer) throws TException {
+  public void closeWriter(String writer) throws TException {
     try {
       BatchWriter batchwriter = writerCache.getIfPresent(UUID.fromString(writer));
       if (batchwriter == null) {
@@ -916,8 +975,19 @@ public class ProxyServer implements Accu
     }
   }
   
-  private BatchWriter getWriter(UserPass userpass, String tableName) throws Exception {
-    return getConnector(userpass).createBatchWriter(tableName, new BatchWriterConfig());
+  private BatchWriter getWriter(UserPass userpass, String tableName, WriterOptions opts)
throws Exception {
+    BatchWriterConfig cfg = new BatchWriterConfig();
+    if (opts != null) {
+      if (opts.maxMemory != 0)
+        cfg.setMaxMemory(opts.maxMemory);
+      if (opts.threads != 0)
+        cfg.setMaxWriteThreads(opts.threads);
+      if (opts.timeoutMs != 0)
+        cfg.setTimeout(opts.timeoutMs, TimeUnit.MILLISECONDS);
+      if (opts.latencyMs != 0)
+        cfg.setMaxLatency(opts.latencyMs, TimeUnit.MILLISECONDS);
+    }
+    return getConnector(userpass).createBatchWriter(tableName, cfg);
   }
   
   private IteratorSetting getIteratorSetting(org.apache.accumulo.proxy.thrift.IteratorSetting
setting) {
@@ -1051,13 +1121,7 @@ public class ProxyServer implements Accu
   }
   
   private Range getRange(org.apache.accumulo.proxy.thrift.Range range) {
-    return new Range(getKey(range.start), getKey(range.stop));
-  }
-  
-  private Key getKey(org.apache.accumulo.proxy.thrift.Key start) {
-    if (start == null)
-      return null;
-    return new Key(start.getRow(), start.getColFamily(), start.getColQualifier(), start.getColVisibility(),
0);
+    return new Range(Util.fromThrift(range.start), Util.fromThrift(range.stop));
   }
   
   @Override
@@ -1079,7 +1143,7 @@ public class ProxyServer implements Accu
   
   @Override
   public org.apache.accumulo.proxy.thrift.Key getFollowing(org.apache.accumulo.proxy.thrift.Key
key, org.apache.accumulo.proxy.thrift.PartialKey part) throws TException {
-    Key key_ = getKey(key);
+    Key key_ = Util.fromThrift(key);
     PartialKey part_ = PartialKey.valueOf(part.toString());
     Key followingKey = key_.followingKey(part_);
     return getProxyKey(followingKey);

Modified: accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
(original)
+++ accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/TestProxyClient.java
Fri Jan 25 03:56:14 2013
@@ -114,7 +114,7 @@ public class TestProxyClient {
     start = new Date();
     then = new Date();
     mutations.clear();
-    String writer = tpc.proxy().createWriter(userpass, testTable);
+    String writer = tpc.proxy().createWriter(userpass, testTable, null);
     for (int i = 0; i < maxInserts; i++) {
       String result = String.format(format, i);
       Key pkey = new Key();
@@ -122,7 +122,7 @@ public class TestProxyClient {
       ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(("cf" + i).getBytes()), ByteBuffer.wrap(("cq"
+ i).getBytes()));
       update.setValue(Util.randStringBuffer(10));
       mutations.put(ByteBuffer.wrap(result.getBytes()), Collections.singletonList(update));
-      tpc.proxy().writer_update(writer, mutations);
+      tpc.proxy().update(writer, mutations);
       mutations.clear();
     }
     
@@ -130,7 +130,7 @@ public class TestProxyClient {
     System.out.println(" End of writing: " + (end.getTime() - start.getTime()));
     start = end;
     System.out.println("Closing...");
-    tpc.proxy().writer_close(writer);
+    tpc.proxy().closeWriter(writer);
     end = new Date();
     System.out.println(" End of closing: " + (end.getTime() - start.getTime()));
     
@@ -141,9 +141,7 @@ public class TestProxyClient {
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, null, regex, null, null, false);
     
-    Key stop = new Key();
-    stop.setRow("5".getBytes());
-    String cookie = tpc.proxy().createBatchScanner(userpass, testTable, null, null, null);
+    String cookie = tpc.proxy().createScanner(userpass, testTable, null);
     
     int i = 0;
     start = new Date();
@@ -152,7 +150,7 @@ public class TestProxyClient {
     
     int k = 1000;
     while (hasNext) {
-      ScanResult kvList = tpc.proxy().scanner_next_k(cookie, k);
+      ScanResult kvList = tpc.proxy().nextK(cookie, k);
       
       Date now = new Date();
       System.out.println(i + " " + (now.getTime() - then.getTime()));

Modified: accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/Util.java?rev=1438312&r1=1438311&r2=1438312&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
(original)
+++ accumulo/branches/ACCUMULO-259/proxy/src/main/java/org/apache/accumulo/proxy/Util.java
Fri Jan 25 03:56:14 2013
@@ -47,6 +47,8 @@ public class Util {
   }
   
   public static org.apache.accumulo.core.data.Key fromThrift(Key pkey) {
+    if (pkey == null)
+      return null;
     return new org.apache.accumulo.core.data.Key(deNullify(pkey.getRow()), deNullify(pkey.getColFamily()),
deNullify(pkey.getColQualifier()), deNullify(pkey.getColVisibility()),
         pkey.getTimestamp());
   }



Mime
View raw message