accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1328480 - in /accumulo/branches/1.4/src: core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ core/src/main/thrift/ server/src/main/java/org/apache/accumulo/server/logger/ server/src/main/java/org/apache/accumulo/server/master/...
Date Fri, 20 Apr 2012 19:12:46 GMT
Author: kturner
Date: Fri Apr 20 19:12:45 2012
New Revision: 1328480

URL: http://svn.apache.org/viewvc?rev=1328480&view=rev
Log:
ACCUMULO-449 Made walog recovery more responsive in the face of logger restarts

Added:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
    accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java

Added: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java?rev=1328480&view=auto
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
(added)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
Fri Apr 20 19:12:45 2012
@@ -0,0 +1,408 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package org.apache.accumulo.core.tabletserver.thrift;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class LogCopyInfo implements org.apache.thrift.TBase<LogCopyInfo,
LogCopyInfo._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LogCopyInfo");
+
+  private static final org.apache.thrift.protocol.TField FILE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("fileSize",
org.apache.thrift.protocol.TType.I64, (short)1);
+  private static final org.apache.thrift.protocol.TField LOGGER_ZNODE_FIELD_DESC = new org.apache.thrift.protocol.TField("loggerZNode",
org.apache.thrift.protocol.TType.STRING, (short)2);
+
+  public long fileSize;
+  public String loggerZNode;
+
+  /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    FILE_SIZE((short)1, "fileSize"),
+    LOGGER_ZNODE((short)2, "loggerZNode");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // FILE_SIZE
+          return FILE_SIZE;
+        case 2: // LOGGER_ZNODE
+          return LOGGER_ZNODE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't
exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __FILESIZE_ISSET_ID = 0;
+  private BitSet __isset_bit_vector = new BitSet(1);
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.FILE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("fileSize",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.LOGGER_ZNODE, new org.apache.thrift.meta_data.FieldMetaData("loggerZNode",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LogCopyInfo.class, metaDataMap);
+  }
+
+  public LogCopyInfo() {
+  }
+
+  public LogCopyInfo(
+    long fileSize,
+    String loggerZNode)
+  {
+    this();
+    this.fileSize = fileSize;
+    setFileSizeIsSet(true);
+    this.loggerZNode = loggerZNode;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public LogCopyInfo(LogCopyInfo other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    this.fileSize = other.fileSize;
+    if (other.isSetLoggerZNode()) {
+      this.loggerZNode = other.loggerZNode;
+    }
+  }
+
+  public LogCopyInfo deepCopy() {
+    return new LogCopyInfo(this);
+  }
+
+  @Override
+  public void clear() {
+    setFileSizeIsSet(false);
+    this.fileSize = 0;
+    this.loggerZNode = null;
+  }
+
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  public LogCopyInfo setFileSize(long fileSize) {
+    this.fileSize = fileSize;
+    setFileSizeIsSet(true);
+    return this;
+  }
+
+  public void unsetFileSize() {
+    __isset_bit_vector.clear(__FILESIZE_ISSET_ID);
+  }
+
+  /** Returns true if field fileSize is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetFileSize() {
+    return __isset_bit_vector.get(__FILESIZE_ISSET_ID);
+  }
+
+  public void setFileSizeIsSet(boolean value) {
+    __isset_bit_vector.set(__FILESIZE_ISSET_ID, value);
+  }
+
+  public String getLoggerZNode() {
+    return this.loggerZNode;
+  }
+
+  public LogCopyInfo setLoggerZNode(String loggerZNode) {
+    this.loggerZNode = loggerZNode;
+    return this;
+  }
+
+  public void unsetLoggerZNode() {
+    this.loggerZNode = null;
+  }
+
+  /** Returns true if field loggerZNode is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetLoggerZNode() {
+    return this.loggerZNode != null;
+  }
+
+  public void setLoggerZNodeIsSet(boolean value) {
+    if (!value) {
+      this.loggerZNode = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case FILE_SIZE:
+      if (value == null) {
+        unsetFileSize();
+      } else {
+        setFileSize((Long)value);
+      }
+      break;
+
+    case LOGGER_ZNODE:
+      if (value == null) {
+        unsetLoggerZNode();
+      } else {
+        setLoggerZNode((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case FILE_SIZE:
+      return new Long(getFileSize());
+
+    case LOGGER_ZNODE:
+      return getLoggerZNode();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and
false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case FILE_SIZE:
+      return isSetFileSize();
+    case LOGGER_ZNODE:
+      return isSetLoggerZNode();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof LogCopyInfo)
+      return this.equals((LogCopyInfo)that);
+    return false;
+  }
+
+  public boolean equals(LogCopyInfo that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_fileSize = true;
+    boolean that_present_fileSize = true;
+    if (this_present_fileSize || that_present_fileSize) {
+      if (!(this_present_fileSize && that_present_fileSize))
+        return false;
+      if (this.fileSize != that.fileSize)
+        return false;
+    }
+
+    boolean this_present_loggerZNode = true && this.isSetLoggerZNode();
+    boolean that_present_loggerZNode = true && that.isSetLoggerZNode();
+    if (this_present_loggerZNode || that_present_loggerZNode) {
+      if (!(this_present_loggerZNode && that_present_loggerZNode))
+        return false;
+      if (!this.loggerZNode.equals(that.loggerZNode))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(LogCopyInfo other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    LogCopyInfo typedOther = (LogCopyInfo)other;
+
+    lastComparison = Boolean.valueOf(isSetFileSize()).compareTo(typedOther.isSetFileSize());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFileSize()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fileSize, typedOther.fileSize);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetLoggerZNode()).compareTo(typedOther.isSetLoggerZNode());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetLoggerZNode()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.loggerZNode, typedOther.loggerZNode);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException
{
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // FILE_SIZE
+          if (field.type == org.apache.thrift.protocol.TType.I64) {
+            this.fileSize = iprot.readI64();
+            setFileSizeIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // LOGGER_ZNODE
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.loggerZNode = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+
+    // check for required fields of primitive type, which can't be checked in the validate
method
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
{
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    oprot.writeFieldBegin(FILE_SIZE_FIELD_DESC);
+    oprot.writeI64(this.fileSize);
+    oprot.writeFieldEnd();
+    if (this.loggerZNode != null) {
+      oprot.writeFieldBegin(LOGGER_ZNODE_FIELD_DESC);
+      oprot.writeString(this.loggerZNode);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("LogCopyInfo(");
+    boolean first = true;
+
+    sb.append("fileSize:");
+    sb.append(this.fileSize);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("loggerZNode:");
+    if (this.loggerZNode == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.loggerZNode);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException
{
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky,
and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
Fri Apr 20 19:12:45 2012
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
     public void close(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, long id) throws
NoSuchLogIDException, LoggerClosedException, org.apache.thrift.TException;
 
-    public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
+    public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
     public List<String> getClosedLogs(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo,
org.apache.accumulo.core.security.thrift.AuthInfo credentials) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
@@ -412,7 +412,7 @@ import org.slf4j.LoggerFactory;
       return;
     }
 
-    public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
+    public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
     {
       send_startCopy(tinfo, credentials, name, fullyQualifiedFileName, sort);
       return recv_startCopy();
@@ -432,7 +432,7 @@ import org.slf4j.LoggerFactory;
       oprot_.getTransport().flush();
     }
 
-    public long recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
+    public LogCopyInfo recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
     {
       org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
       if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
@@ -904,7 +904,7 @@ import org.slf4j.LoggerFactory;
         prot.writeMessageEnd();
       }
 
-      public long getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException {
+      public LogCopyInfo getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException {
         if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
           throw new IllegalStateException("Method call not finished!");
         }
@@ -1400,7 +1400,6 @@ import org.slf4j.LoggerFactory;
         startCopy_result result = new startCopy_result();
         try {
           result.success = iface_.startCopy(args.tinfo, args.credentials, args.name, args.fullyQualifiedFileName,
args.sort);
-          result.setSuccessIsSet(true);
         } catch (org.apache.accumulo.core.security.thrift.ThriftSecurityException sec) {
           result.sec = sec;
         } catch (Throwable th) {
@@ -8945,10 +8944,10 @@ import org.slf4j.LoggerFactory;
   public static class startCopy_result implements org.apache.thrift.TBase<startCopy_result,
startCopy_result._Fields>, java.io.Serializable, Cloneable   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("startCopy_result");
 
-    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success",
org.apache.thrift.protocol.TType.I64, (short)0);
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success",
org.apache.thrift.protocol.TType.STRUCT, (short)0);
     private static final org.apache.thrift.protocol.TField SEC_FIELD_DESC = new org.apache.thrift.protocol.TField("sec",
org.apache.thrift.protocol.TType.STRUCT, (short)1);
 
-    public long success;
+    public LogCopyInfo success;
     public org.apache.accumulo.core.security.thrift.ThriftSecurityException sec;
 
     /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
@@ -9013,14 +9012,12 @@ import org.slf4j.LoggerFactory;
     }
 
     // isset id assignments
-    private static final int __SUCCESS_ISSET_ID = 0;
-    private BitSet __isset_bit_vector = new BitSet(1);
 
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
       tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
LogCopyInfo.class)));
       tmpMap.put(_Fields.SEC, new org.apache.thrift.meta_data.FieldMetaData("sec", org.apache.thrift.TFieldRequirementType.DEFAULT,

           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
@@ -9031,12 +9028,11 @@ import org.slf4j.LoggerFactory;
     }
 
     public startCopy_result(
-      long success,
+      LogCopyInfo success,
       org.apache.accumulo.core.security.thrift.ThriftSecurityException sec)
     {
       this();
       this.success = success;
-      setSuccessIsSet(true);
       this.sec = sec;
     }
 
@@ -9044,9 +9040,9 @@ import org.slf4j.LoggerFactory;
      * Performs a deep copy on <i>other</i>.
      */
     public startCopy_result(startCopy_result other) {
-      __isset_bit_vector.clear();
-      __isset_bit_vector.or(other.__isset_bit_vector);
-      this.success = other.success;
+      if (other.isSetSuccess()) {
+        this.success = new LogCopyInfo(other.success);
+      }
       if (other.isSetSec()) {
         this.sec = new org.apache.accumulo.core.security.thrift.ThriftSecurityException(other.sec);
       }
@@ -9058,32 +9054,32 @@ import org.slf4j.LoggerFactory;
 
     @Override
     public void clear() {
-      setSuccessIsSet(false);
-      this.success = 0;
+      this.success = null;
       this.sec = null;
     }
 
-    public long getSuccess() {
+    public LogCopyInfo getSuccess() {
       return this.success;
     }
 
-    public startCopy_result setSuccess(long success) {
+    public startCopy_result setSuccess(LogCopyInfo success) {
       this.success = success;
-      setSuccessIsSet(true);
       return this;
     }
 
     public void unsetSuccess() {
-      __isset_bit_vector.clear(__SUCCESS_ISSET_ID);
+      this.success = null;
     }
 
     /** Returns true if field success is set (has been assigned a value) and false otherwise
*/
     public boolean isSetSuccess() {
-      return __isset_bit_vector.get(__SUCCESS_ISSET_ID);
+      return this.success != null;
     }
 
     public void setSuccessIsSet(boolean value) {
-      __isset_bit_vector.set(__SUCCESS_ISSET_ID, value);
+      if (!value) {
+        this.success = null;
+      }
     }
 
     public org.apache.accumulo.core.security.thrift.ThriftSecurityException getSec() {
@@ -9116,7 +9112,7 @@ import org.slf4j.LoggerFactory;
         if (value == null) {
           unsetSuccess();
         } else {
-          setSuccess((Long)value);
+          setSuccess((LogCopyInfo)value);
         }
         break;
 
@@ -9134,7 +9130,7 @@ import org.slf4j.LoggerFactory;
     public Object getFieldValue(_Fields field) {
       switch (field) {
       case SUCCESS:
-        return new Long(getSuccess());
+        return getSuccess();
 
       case SEC:
         return getSec();
@@ -9171,12 +9167,12 @@ import org.slf4j.LoggerFactory;
       if (that == null)
         return false;
 
-      boolean this_present_success = true;
-      boolean that_present_success = true;
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
       if (this_present_success || that_present_success) {
         if (!(this_present_success && that_present_success))
           return false;
-        if (this.success != that.success)
+        if (!this.success.equals(that.success))
           return false;
       }
 
@@ -9243,9 +9239,9 @@ import org.slf4j.LoggerFactory;
         }
         switch (field.id) {
           case 0: // SUCCESS
-            if (field.type == org.apache.thrift.protocol.TType.I64) {
-              this.success = iprot.readI64();
-              setSuccessIsSet(true);
+            if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+              this.success = new LogCopyInfo();
+              this.success.read(iprot);
             } else { 
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
             }
@@ -9274,7 +9270,7 @@ import org.slf4j.LoggerFactory;
 
       if (this.isSetSuccess()) {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
-        oprot.writeI64(this.success);
+        this.success.write(oprot);
         oprot.writeFieldEnd();
       } else if (this.isSetSec()) {
         oprot.writeFieldBegin(SEC_FIELD_DESC);
@@ -9291,7 +9287,11 @@ import org.slf4j.LoggerFactory;
       boolean first = true;
 
       sb.append("success:");
-      sb.append(this.success);
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
       first = false;
       if (!first) sb.append(", ");
       sb.append("sec:");

Modified: accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift Fri Apr 20 19:12:45
2012
@@ -169,6 +169,10 @@ struct TabletMutations {
 	3:list<data.TMutation> mutations
 }
 
+struct LogCopyInfo {
+	1:i64 fileSize,
+	2:string loggerZNode
+}
 
 service MutationLogger {
     LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession)
throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce),
@@ -182,7 +186,7 @@ service MutationLogger {
     void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli,
2:LoggerClosedException lce),
 
     // close a log file and initiate the push to HDFS
-    i64 startCopy(4:cloudtrace.TInfo tinfo,
+    LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo,
                   1:security.AuthInfo credentials,
                   2:string name, 
                   3:string fullyQualifiedFileName,

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
Fri Apr 20 19:12:45 2012
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.accumulo.cloudtrace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
@@ -45,6 +46,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
@@ -55,6 +57,7 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.logger.LogWriter.LogWriteException;
@@ -69,6 +72,7 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
@@ -78,7 +82,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
-
 /**
  * A Mutation logging service.
  * 
@@ -94,6 +97,7 @@ public class LogService implements Mutat
   private TServer service;
   private LogWriter writer_;
   private MutationLogger.Iface writer;
+  private String ephemeralNode;
   
   enum ShutdownState {
     STARTED, REGISTERED, WAITING_FOR_HALT, HALT
@@ -189,6 +193,10 @@ public class LogService implements Mutat
     boolean archive = ServerConfiguration.getSystemConfiguration().getBoolean(Property.LOGGER_ARCHIVE);
     AccumuloConfiguration acuConf = ServerConfiguration.getSystemConfiguration();
     writer_ = new LogWriter(acuConf, fs, rootDirs, HdfsZooInstance.getInstance().getInstanceID(),
poolSize, archive);
+    
+    // call before putting this service online
+    removeIncompleteCopies(acuConf, fs, rootDirs);
+
     InvocationHandler h = new InvocationHandler() {
       @Override
       public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
@@ -232,6 +240,40 @@ public class LogService implements Mutat
     Accumulo.enableTracing(address.getHostName(), "logger");
   }
   
+  /**
+   * @param acuConf
+   * @param fs
+   * @param rootDirs
+   * @throws IOException
+   */
+  private void removeIncompleteCopies(AccumuloConfiguration acuConf, FileSystem fs, Set<String>
rootDirs) throws IOException {
+    if (acuConf.getBoolean(Property.MASTER_RECOVERY_SORT_MAPREDUCE)) {
+      return;
+    }
+
+    Set<String> walogs = new HashSet<String>();
+    for (String root : rootDirs) {
+      File rootFile = new File(root);
+      for (File walog : rootFile.listFiles()) {
+        try {
+          UUID.fromString(walog.getName());
+          walogs.add(walog.getName());
+        } catch (IllegalArgumentException iea) {
+          LOG.debug("Ignoring " + walog.getName());
+        }
+      }
+    }
+    
+    // look for .recovered that are not finished
+    for (String walog : walogs) {
+      Path path = new Path(ServerConstants.getRecoveryDir() + "/" + walog + ".recovered");
+      if (fs.exists(path) && !fs.exists(new Path(path, "finished"))) {
+        LOG.debug("Incomplete copy/sort in dfs, deleting " + path);
+        fs.delete(path, true);
+      }
+    }
+  }
+
   public void run() {
     try {
       while (!service.isServing()) {
@@ -255,6 +297,7 @@ public class LogService implements Mutat
       String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + zooDir;
       path += "/logger-";
       path = zoo.putEphemeralSequential(path, addressString.getBytes());
+      ephemeralNode = path;
       zoo.exists(path, this);
     } catch (Exception ex) {
       throw new RuntimeException("Unexpected error creating zookeeper entry " + zooDir);
@@ -280,10 +323,12 @@ public class LogService implements Mutat
   }
   
   @Override
-  public long startCopy(TInfo info, AuthInfo credentials, String localLog, String fullyQualifiedFileName,
boolean sort) throws ThriftSecurityException,
+  public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, String localLog, String
fullyQualifiedFileName, boolean sort) throws ThriftSecurityException,
       TException {
     checkForSystemPrivs("copy", credentials);
-    return writer.startCopy(null, credentials, localLog, fullyQualifiedFileName, sort);
+    LogCopyInfo lci = writer.startCopy(null, credentials, localLog, fullyQualifiedFileName,
sort);
+    lci.loggerZNode = ephemeralNode;
+    return lci;
   }
   
   @Override

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
Fri Apr 20 19:12:45 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
@@ -234,7 +235,7 @@ class LogWriter implements MutationLogge
   }
   
   @Override
-  public long startCopy(TInfo info, AuthInfo credentials, final String localLog, final String
fullyQualifiedFileName, final boolean sort) {
+  public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, final String localLog, final
String fullyQualifiedFileName, final boolean sort) {
     log.info("Copying " + localLog + " to " + fullyQualifiedFileName);
     final long t1 = System.currentTimeMillis();
     try {
@@ -314,6 +315,7 @@ class LogWriter implements MutationLogge
               memorySize = 0;
             }
           }
+
           if (!kv.isEmpty())
             writeSortedEntries(dest, part++, kv);
           fs.create(new Path(dest, "finished")).close();
@@ -380,7 +382,7 @@ class LogWriter implements MutationLogge
         log.info("Copying " + localLog + " complete");
       }
     });
-    return result;
+    return new LogCopyInfo(result, null);
   }
   
   @Override

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
Fri Apr 20 19:12:45 2012
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -39,6 +40,7 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
@@ -58,6 +60,8 @@ public class CoordinateRecoveryTask impl
   
   private boolean stop = false;
   
+  private ZooCache zcache;
+  
   private static String fullName(String name) {
     return ServerConstants.getRecoveryDir() + "/" + name;
   }
@@ -118,6 +122,7 @@ public class CoordinateRecoveryTask impl
     Job sortJob = null;
     boolean useMapReduce = ServerConfiguration.getSystemConfiguration().getBoolean(Property.MASTER_RECOVERY_SORT_MAPREDUCE);
     JobComplete notify = null;
+    String loggerZNode;
     
     RecoveryJob(LogFile entry, JobComplete callback) throws Exception {
       logFile = entry;
@@ -132,7 +137,9 @@ public class CoordinateRecoveryTask impl
         RemoteLogger logger = new RemoteLogger(logFile.server);
         String base = logFile.unsortedFileName();
         log.debug("Starting to copy " + logFile.file + " from " + logFile.server);
-        copySize = logger.startCopy(logFile.file, base, !useMapReduce);
+        LogCopyInfo lci = logger.startCopy(logFile.file, base, !useMapReduce);
+        copySize = lci.fileSize;
+        loggerZNode = lci.loggerZNode;
       } catch (Throwable t) {
         log.warn("Unable to recover " + logFile + "(" + t + ")", t);
         fail();
@@ -162,6 +169,11 @@ public class CoordinateRecoveryTask impl
         return true;
       }
       
+      if (zcache.get(loggerZNode) == null) {
+        log.debug("zknode " + loggerZNode + " is gone, copy " + logFile.file + " from " +
logFile.server + " assumed dead");
+        return true;
+      }
+
       if (elapsedMillis() > ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_MAXTIME))
{
         log.warn("Recovery taking too long, giving up");
         if (sortJob != null)
@@ -249,6 +261,7 @@ public class CoordinateRecoveryTask impl
   
   public CoordinateRecoveryTask(FileSystem fs) {
     this.fs = fs;
+    zcache = new ZooCache();
   }
   
   public boolean recover(AuthInfo credentials, KeyExtent extent, Collection<Collection<String>>
entries, JobComplete notify) {

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1328480&r1=1328479&r2=1328480&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
Fri Apr 20 19:12:45 2012
@@ -24,6 +24,7 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
@@ -152,7 +153,7 @@ public class RemoteLogger {
     client.minorCompactionStarted(null, logFile.id, seq, tid, fqfn);
   }
   
-  public synchronized long startCopy(String name, String fullyQualifiedFileName, boolean
sort) throws ThriftSecurityException, TException {
+  public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName, boolean
sort) throws ThriftSecurityException, TException {
     return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName,
sort);
   }
   



Mime
View raw message