accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject git commit: ACCUMULO-1000 added conditional writer error handling and test for error handling
Date Wed, 17 Jul 2013 19:38:34 GMT
Updated Branches:
  refs/heads/ACCUMULO-1000 fbcbd8746 -> 1fadfd22d


ACCUMULO-1000 added conditional writer error handling and test for error handling


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1fadfd22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1fadfd22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1fadfd22

Branch: refs/heads/ACCUMULO-1000
Commit: 1fadfd22df958165f4038f09c465637ad9930b83
Parents: fbcbd87
Author: Keith Turner <kturner@apache.org>
Authored: Wed Jul 17 15:38:20 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Wed Jul 17 15:38:20 2013 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/ConditionalWriter.java |  32 +++-
 .../core/client/impl/ConditionalWriterImpl.java |  74 +++++----
 .../thrift/TabletClientService.java             | 127 +++++++++++++--
 core/src/main/thrift/tabletserver.thrift        |   3 +-
 .../server/security/SecurityOperation.java      |  23 +++
 .../server/tabletserver/TabletServer.java       |  13 +-
 .../accumulo/test/FaultyConditionalWriter.java  |   4 +-
 .../accumulo/test/functional/BadIterator.java   |   5 +
 .../accumulo/test/ConditionalWriterTest.java    | 160 ++++++++++++++++++-
 9 files changed, 390 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 068e3e7..b434463 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -30,19 +30,47 @@ public interface ConditionalWriter {
     
     private Status status;
     private ConditionalMutation mutation;
+    private String server;
+    private Exception exception;
     
-    public Result(Status s, ConditionalMutation m) {
+    public Result(Status s, ConditionalMutation m, String server) {
       this.status = s;
       this.mutation = m;
+      this.server = server;
     }
     
-    public Status getStatus() {
+    public Result(Exception e, ConditionalMutation cm, String server) {
+      this.exception = e;
+      this.mutation = cm;
+      this.server = server;
+    }
+
+    public Status getStatus() throws AccumuloException, AccumuloSecurityException {
+      if (status == null) {
+        if (exception instanceof AccumuloException)
+          throw (AccumuloException) exception;
+        if (exception instanceof AccumuloSecurityException)
+          throw (AccumuloSecurityException) exception;
+        if (exception instanceof RuntimeException)
+          throw (RuntimeException) exception;
+        else
+          throw new AccumuloException(exception);
+      }
+
       return status;
     }
     
     public ConditionalMutation getMutation() {
       return mutation;
     }
+    
+    /**
+     * 
+     * @return The server this mutation was sent to. Returns null if was not sent to a server.
+     */
+    public String getTabletServer() {
+      return server;
+    }
   }
   
   public static enum Status {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 3b6f8a5..f0d6108 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -34,12 +34,13 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Condition;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -50,6 +51,7 @@ import org.apache.accumulo.core.data.thrift.TCondition;
 import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.VisibilityEvaluator;
@@ -58,12 +60,14 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
@@ -72,13 +76,16 @@ import org.apache.thrift.transport.TTransportException;
 
 class ConditionalWriterImpl implements ConditionalWriter {
   
+  private static final Logger log = Logger.getLogger(ConditionalWriterImpl.class);
+
   private Authorizations auths;
   private VisibilityEvaluator ve;
   @SuppressWarnings("unchecked")
-  private Map<Text,Boolean> cache = Collections.synchronizedMap(new LRUMap(1000));;
+  private Map<Text,Boolean> cache = Collections.synchronizedMap(new LRUMap(1000));
   private Instance instance;
   private TCredentials credentials;
   private TabletLocator locator;
+  private String tableId;
 
 
   private Map<String,BlockingQueue<TabletServerMutations<QCMutation>>>
serverQueues;
@@ -106,7 +113,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
         throw new NoSuchElementException();
 
       try {
-        // TODO maybe call drainTo after take to get a batch efficiently
+        // TODO maybe call drainTo after take() to get a batch efficiently
         Result result = rq.poll(1, TimeUnit.SECONDS);
         while (result == null) {
           
@@ -184,15 +191,20 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     try {
       locator.binMutations(mutations, binnedMutations, failures, credentials);
-    } catch (AccumuloException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (AccumuloSecurityException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (TableNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      
+      if (failures.size() == mutations.size())
+        if (!Tables.exists(instance, tableId))
+          throw new TableDeletedException(tableId);
+        else if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+          throw new TableOfflineException(instance, tableId);
+
+    } catch (Exception e) {
+      for (QCMutation qcm : mutations)
+        qcm.resultQueue.add(new Result(e, qcm, null));
+      
+      // do not want to queue anything that was put in before binMutations() failed
+      failures.clear();
+      binnedMutations.clear();
     }
     
     if (failures.size() > 0)
@@ -202,6 +214,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
       queue(entry.getKey(), entry.getValue());
     }
 
+
   }
 
   private void queue(String location, TabletServerMutations<QCMutation> mutations)
{
@@ -209,7 +222,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location);
     
     queue.add(mutations);
-    threadPool.execute(new SendTask(location));
+    threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
   }
 
   private TabletServerMutations<QCMutation> dequeue(String location) {
@@ -253,24 +266,21 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.threadPool.setMaximumPoolSize(3);
     this.locator = TabletLocator.getLocator(instance, new Text(tableId));
     this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations<QCMutation>>>();
-    
+    this.tableId = tableId;
+
     Runnable failureHandler = new Runnable() {
       
       @Override
       public void run() {
-        try {
           List<QCMutation> mutations = new ArrayList<QCMutation>();
           failedMutations.drainTo(mutations);
           queue(mutations);
-        } catch (Exception e) {
-          // TODO log
-          e.printStackTrace();
-        }
-        
       }
     };
     
-    threadPool.scheduleAtFixedRate(failureHandler, 100, 100, TimeUnit.MILLISECONDS);
+    failureHandler = new LoggingRunnable(log, failureHandler);
+    
+    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
   }
 
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
@@ -288,7 +298,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
       for (Condition cond : mut.getConditions()) {
         if (!isVisible(cond.getVisibility())) {
-          resultQueue.add(new Result(Status.INVISIBLE_VISIBILITY, mut));
+          resultQueue.add(new Result(Status.INVISIBLE_VISIBILITY, mut, null));
           continue mloop;
         }
       }
@@ -362,7 +372,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
           extentsToInvalidate.add(cmk.ke);
         } else {
           QCMutation qcm = cmidToCm.get(tcmResult.cmid).cm;
-          qcm.resultQueue.add(new Result(fromThrift(tcmResult.status), qcm));
+          qcm.resultQueue.add(new Result(fromThrift(tcmResult.status), qcm, location));
         }
       }
 
@@ -375,19 +385,25 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
       queueFailed(ignored);
 
+    } catch (ThriftSecurityException tse) {
+      AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(),
tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
+          tableId), tse);
+      for (CMK cmk : cmidToCm.values())
+        cmk.cm.resultQueue.add(new Result(ase, cmk.cm, location));
     } catch (TTransportException e) {
       locator.invalidateCache(location);
       for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
+        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
     } catch (TApplicationException tae) {
       for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
-      // TODO should another status be used?
-      // TODO need to get server where error occurred back to client
+        cmk.cm.resultQueue.add(new Result(new AccumuloServerException(location, tae), cmk.cm,
location));
     } catch (TException e) {
       locator.invalidateCache(location);
       for (CMK cmk : cmidToCm.values())
-        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm));
+        cmk.cm.resultQueue.add(new Result(Status.UNKNOWN, cmk.cm, location));
+    } catch (Exception e) {
+      for (CMK cmk : cmidToCm.values())
+        cmk.cm.resultQueue.add(new Result(e, cmk.cm, location));
     } finally {
       ThriftUtil.returnClient((TServiceClient) client);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index 7ddfd98..94744b5 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
 
     public void update(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent keyExtent, org.apache.accumulo.core.data.thrift.TMutation
mutation) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException,
ConstraintViolationException, org.apache.thrift.TException;
 
-    public List<org.apache.accumulo.core.data.thrift.TCMResult> conditionalUpdate(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<ByteBuffer>
authorizations, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TConditionalMutation>>
mutations, List<String> symbols) throws org.apache.thrift.TException;
+    public List<org.apache.accumulo.core.data.thrift.TCMResult> conditionalUpdate(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<ByteBuffer>
authorizations, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TConditionalMutation>>
mutations, List<String> symbols) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
     public List<org.apache.accumulo.core.data.thrift.TKeyExtent> bulkImport(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long tid, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>
files, boolean setTime) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
@@ -457,7 +457,7 @@ import org.slf4j.LoggerFactory;
       return;
     }
 
-    public List<org.apache.accumulo.core.data.thrift.TCMResult> conditionalUpdate(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<ByteBuffer>
authorizations, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TConditionalMutation>>
mutations, List<String> symbols) throws org.apache.thrift.TException
+    public List<org.apache.accumulo.core.data.thrift.TCMResult> conditionalUpdate(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<ByteBuffer>
authorizations, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TConditionalMutation>>
mutations, List<String> symbols) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
org.apache.thrift.TException
     {
       send_conditionalUpdate(tinfo, credentials, authorizations, mutations, symbols);
       return recv_conditionalUpdate();
@@ -474,13 +474,16 @@ import org.slf4j.LoggerFactory;
       sendBase("conditionalUpdate", args);
     }
 
-    public List<org.apache.accumulo.core.data.thrift.TCMResult> recv_conditionalUpdate()
throws org.apache.thrift.TException
+    public List<org.apache.accumulo.core.data.thrift.TCMResult> recv_conditionalUpdate()
throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
     {
       conditionalUpdate_result result = new conditionalUpdate_result();
       receiveBase(result, "conditionalUpdate");
       if (result.isSetSuccess()) {
         return result.success;
       }
+      if (result.sec != null) {
+        throw result.sec;
+      }
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT,
"conditionalUpdate failed: unknown result");
     }
 
@@ -1284,7 +1287,7 @@ import org.slf4j.LoggerFactory;
         prot.writeMessageEnd();
       }
 
-      public List<org.apache.accumulo.core.data.thrift.TCMResult> getResult() throws
org.apache.thrift.TException {
+      public List<org.apache.accumulo.core.data.thrift.TCMResult> getResult() throws
org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
{
         if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
           throw new IllegalStateException("Method call not finished!");
         }
@@ -2225,7 +2228,11 @@ import org.slf4j.LoggerFactory;
 
       public conditionalUpdate_result getResult(I iface, conditionalUpdate_args args) throws
org.apache.thrift.TException {
         conditionalUpdate_result result = new conditionalUpdate_result();
-        result.success = iface.conditionalUpdate(args.tinfo, args.credentials, args.authorizations,
args.mutations, args.symbols);
+        try {
+          result.success = iface.conditionalUpdate(args.tinfo, args.credentials, args.authorizations,
args.mutations, args.symbols);
+        } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec)
{
+          result.sec = sec;
+        }
         return result;
       }
     }
@@ -14820,6 +14827,7 @@ import org.slf4j.LoggerFactory;
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("conditionalUpdate_result");
 
     private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success",
org.apache.thrift.protocol.TType.LIST, (short)0);
+    private static final org.apache.thrift.protocol.TField SEC_FIELD_DESC = new org.apache.thrift.protocol.TField("sec",
org.apache.thrift.protocol.TType.STRUCT, (short)1);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -14828,10 +14836,12 @@ import org.slf4j.LoggerFactory;
     }
 
     public List<org.apache.accumulo.core.data.thrift.TCMResult> success; // required
+    public org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
     @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum
{
-      SUCCESS((short)0, "success");
+      SUCCESS((short)0, "success"),
+      SEC((short)1, "sec");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -14848,6 +14858,8 @@ import org.slf4j.LoggerFactory;
         switch(fieldId) {
           case 0: // SUCCESS
             return SUCCESS;
+          case 1: // SEC
+            return SEC;
           default:
             return null;
         }
@@ -14894,6 +14906,8 @@ import org.slf4j.LoggerFactory;
       tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,

               new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
org.apache.accumulo.core.data.thrift.TCMResult.class))));
+      tmpMap.put(_Fields.SEC, new org.apache.thrift.meta_data.FieldMetaData("sec", org.apache.thrift.TFieldRequirementType.DEFAULT,

+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(conditionalUpdate_result.class,
metaDataMap);
     }
@@ -14902,10 +14916,12 @@ import org.slf4j.LoggerFactory;
     }
 
     public conditionalUpdate_result(
-      List<org.apache.accumulo.core.data.thrift.TCMResult> success)
+      List<org.apache.accumulo.core.data.thrift.TCMResult> success,
+      org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec)
     {
       this();
       this.success = success;
+      this.sec = sec;
     }
 
     /**
@@ -14919,6 +14935,9 @@ import org.slf4j.LoggerFactory;
         }
         this.success = __this__success;
       }
+      if (other.isSetSec()) {
+        this.sec = new org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException(other.sec);
+      }
     }
 
     public conditionalUpdate_result deepCopy() {
@@ -14928,6 +14947,7 @@ import org.slf4j.LoggerFactory;
     @Override
     public void clear() {
       this.success = null;
+      this.sec = null;
     }
 
     public int getSuccessSize() {
@@ -14969,6 +14989,30 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException getSec() {
+      return this.sec;
+    }
+
+    public conditionalUpdate_result setSec(org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException
sec) {
+      this.sec = sec;
+      return this;
+    }
+
+    public void unsetSec() {
+      this.sec = null;
+    }
+
+    /** Returns true if field sec is set (has been assigned a value) and false otherwise
*/
+    public boolean isSetSec() {
+      return this.sec != null;
+    }
+
+    public void setSecIsSet(boolean value) {
+      if (!value) {
+        this.sec = null;
+      }
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case SUCCESS:
@@ -14979,6 +15023,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case SEC:
+        if (value == null) {
+          unsetSec();
+        } else {
+          setSec((org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException)value);
+        }
+        break;
+
       }
     }
 
@@ -14987,6 +15039,9 @@ import org.slf4j.LoggerFactory;
       case SUCCESS:
         return getSuccess();
 
+      case SEC:
+        return getSec();
+
       }
       throw new IllegalStateException();
     }
@@ -15000,6 +15055,8 @@ import org.slf4j.LoggerFactory;
       switch (field) {
       case SUCCESS:
         return isSetSuccess();
+      case SEC:
+        return isSetSec();
       }
       throw new IllegalStateException();
     }
@@ -15026,6 +15083,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_sec = true && this.isSetSec();
+      boolean that_present_sec = true && that.isSetSec();
+      if (this_present_sec || that_present_sec) {
+        if (!(this_present_sec && that_present_sec))
+          return false;
+        if (!this.sec.equals(that.sec))
+          return false;
+      }
+
       return true;
     }
 
@@ -15052,6 +15118,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetSec()).compareTo(typedOther.isSetSec());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSec()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.sec, typedOther.sec);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -15079,6 +15155,14 @@ import org.slf4j.LoggerFactory;
         sb.append(this.success);
       }
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("sec:");
+      if (this.sec == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.sec);
+      }
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -15141,6 +15225,15 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 1: // SEC
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.sec = new org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException();
+                struct.sec.read(iprot);
+                struct.setSecIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -15168,6 +15261,11 @@ import org.slf4j.LoggerFactory;
           }
           oprot.writeFieldEnd();
         }
+        if (struct.sec != null) {
+          oprot.writeFieldBegin(SEC_FIELD_DESC);
+          struct.sec.write(oprot);
+          oprot.writeFieldEnd();
+        }
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -15189,7 +15287,10 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetSuccess()) {
           optionals.set(0);
         }
-        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetSec()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
         if (struct.isSetSuccess()) {
           {
             oprot.writeI32(struct.success.size());
@@ -15199,12 +15300,15 @@ import org.slf4j.LoggerFactory;
             }
           }
         }
+        if (struct.isSetSec()) {
+          struct.sec.write(oprot);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, conditionalUpdate_result
struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(1);
+        BitSet incoming = iprot.readBitSet(2);
         if (incoming.get(0)) {
           {
             org.apache.thrift.protocol.TList _list259 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT,
iprot.readI32());
@@ -15219,6 +15323,11 @@ import org.slf4j.LoggerFactory;
           }
           struct.setSuccessIsSet(true);
         }
+        if (incoming.get(1)) {
+          struct.sec = new org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException();
+          struct.sec.read(iprot);
+          struct.setSecIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 8887202..5dfcb4b 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -167,7 +167,8 @@ service TabletClientService extends client.ClientService {
             2:NotServingTabletException nste, 
             3:ConstraintViolationException cve),
   
-  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials
credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations, 5:list<string>
symbols);
+  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials
credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations, 5:list<string>
symbols)
+     throws (1:client.ThriftSecurityException sec);
 
   // on success, returns an empty list
   list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials,
4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException
sec),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index e948894..465eb8e 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.server.security;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +32,7 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.master.thrift.TableOperation;
@@ -311,6 +313,27 @@ public class SecurityOperation {
     return hasTablePermission(credentials.getPrincipal(), table, TablePermission.WRITE, true);
   }
   
+  public boolean canConditionallyUpdate(TCredentials credentials, Map<TKeyExtent,List<TConditionalMutation>>
mutations, List<String> symbols,
+      List<ByteBuffer> authorizations) throws ThriftSecurityException {
+    Set<TKeyExtent> ks = mutations.keySet();
+    
+    byte[] table = null;
+    
+    for (TKeyExtent tke : ks) {
+      if (table == null)
+        table = tke.getTable();
+      else if (!Arrays.equals(table, tke.getTable()))
+        return false;
+    }
+    
+    authenticate(credentials);
+    
+    String tableID = new String(table);
+    
+    return hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.WRITE,
true)
+        && hasTablePermission(credentials.getPrincipal(), tableID, TablePermission.READ,
true);
+  }
+
   public boolean canSplitTablet(TCredentials credentials, String table) throws ThriftSecurityException
{
     authenticate(credentials);
     return hasSystemPermission(credentials.getPrincipal(), SystemPermission.ALTER_TABLE,
false)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 43dbdd6..4b905ce 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -1879,8 +1879,17 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
     @Override
     public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations,
-        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String>
symbols) throws TException {
-      // TODO check credentials, permissions, and authorizations
+        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String>
symbols) throws ThriftSecurityException {
+      
+      Authorizations userauths = null;
+      if (!security.canConditionallyUpdate(credentials, mutations, symbols, authorizations))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      userauths = security.getUserAuthorizations(credentials);
+      for (ByteBuffer auth : authorizations)
+        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
       // TODO sessions, should show up in list scans
       // TODO timeout like scans do
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
index 61c033a..de56218 100644
--- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -51,7 +51,7 @@ public class FaultyConditionalWriter implements ConditionalWriter {
     while (mutations.hasNext()) {
       ConditionalMutation cm = mutations.next();
       if (rand.nextDouble() <= up && rand.nextDouble() > wp)
-        resultList.add(new Result(Status.UNKNOWN, cm));
+        resultList.add(new Result(Status.UNKNOWN, cm, null));
       else
         writes.add(cm);
     }
@@ -63,7 +63,7 @@ public class FaultyConditionalWriter implements ConditionalWriter {
         Result result = results.next();
         
         if (rand.nextDouble() <= up && rand.nextDouble() <= wp)
-          result = new Result(Status.UNKNOWN, result.getMutation());
+          result = new Result(Status.UNKNOWN, result.getMutation(), result.getTabletServer());
         resultList.add(result);
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
index e2db273..1c62720 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIterator.java
@@ -30,6 +30,11 @@ public class BadIterator extends WrappingIterator {
   }
   
   @Override
+  public boolean hasTop() {
+    throw new NullPointerException();
+  }
+  
+  @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fadfd22/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 33a7f4e..33dc458 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -34,6 +34,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ConditionalWriter;
@@ -44,7 +46,12 @@ import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -59,11 +66,14 @@ import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.accumulo.test.functional.BadIterator;
 import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -993,23 +1003,161 @@ public class ConditionalWriterTest {
   }
 
   @Test
-  public void testSecurity() {
+  public void testSecurity() throws Exception {
     // test against table user does not have read and/or write permissions for
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.securityOperations().createLocalUser("user1", new PasswordToken("u1p"));
+    
+    conn.tableOperations().create("sect1");
+    conn.tableOperations().create("sect2");
+    conn.tableOperations().create("sect3");
+    
+    conn.securityOperations().grantTablePermission("user1", "sect1", TablePermission.READ);
+    conn.securityOperations().grantTablePermission("user1", "sect2", TablePermission.WRITE);
+    conn.securityOperations().grantTablePermission("user1", "sect3", TablePermission.READ);
+    conn.securityOperations().grantTablePermission("user1", "sect3", TablePermission.WRITE);
+    
+    Connector conn2 = zki.getConnector("user1", new PasswordToken("u1p"));
+    
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", Authorizations.EMPTY);
+    ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", Authorizations.EMPTY);
+    ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", Authorizations.EMPTY);
+    
+    Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
+    
+    try {
+      cw1.write(cm1).getStatus();
+      Assert.assertFalse(true);
+    } catch (AccumuloSecurityException ase) {
+      
+    }
+    
+    try {
+      cw2.write(cm1).getStatus();
+      Assert.assertFalse(true);
+    } catch (AccumuloSecurityException ase) {
+      
+    }
+
   }
 
+
   @Test
   public void testTimeout() {
-    
+    // TODO
   }
 
   @Test
-  public void testOffline() {
-    // TODO test against a offline table
+  public void testDeleteTable() throws Exception {
+    String table = "foo12";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    try {
+      conn.createConditionalWriter(table, Authorizations.EMPTY);
+      Assert.assertFalse(true);
+    } catch (TableNotFoundException e) {}
+    
+    conn.tableOperations().create(table);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    
+    conn.tableOperations().delete(table);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    Result result = cw.write(cm1);
+    
+    try {
+      result.getStatus();
+      Assert.assertFalse(true);
+    } catch (TableDeletedException ae) {
+      
+    }
+    
+  }
+  
+  @Test
+  public void testOffline() throws Exception {
+    String table = "foo11";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    
+    conn.tableOperations().offline(table);
+
+    waitForSingleTabletTableToGoOffline(table, zki);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    Result result = cw.write(cm1);
+    
+    try {
+      result.getStatus();
+      Assert.assertFalse(true);
+    } catch (TableOfflineException ae) {
+      
+    }
+    
+    cw.close();
+    
+    try {
+      conn.createConditionalWriter(table, Authorizations.EMPTY);
+      Assert.assertFalse(true);
+    } catch (TableOfflineException e) {}
+  }
+
+  void waitForSingleTabletTableToGoOffline(String table, ZooKeeperInstance zki) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
+    TabletLocator locator = TabletLocator.getLocator(zki, new Text(Tables.getNameToIdMap(zki).get(table)));
+    while (locator.locateTablet(new Text("a"), false, false, CredentialHelper.create("root",
new PasswordToken(secret), zki.getInstanceID())) != null) {
+      UtilWaitThread.sleep(50);
+    }
   }
 
   @Test
-  public void testError() {
-    // test an iterator that throws an exception
+  public void testError() throws Exception {
+    String table = "foo10";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    
+    IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    Result result = cw.write(cm1);
+    
+    try {
+      result.getStatus();
+      Assert.assertFalse(true);
+    } catch (AccumuloException ae) {
+      
+    }
+    
+    cw.close();
+
   }
 
   @AfterClass


Mime
View raw message