Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7E76E9176 for ; Fri, 20 Apr 2012 19:13:11 +0000 (UTC) Received: (qmail 73668 invoked by uid 500); 20 Apr 2012 19:13:11 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 73637 invoked by uid 500); 20 Apr 2012 19:13:11 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 73630 invoked by uid 99); 20 Apr 2012 19:13:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Apr 2012 19:13:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Apr 2012 19:13:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D13E42388860 for ; Fri, 20 Apr 2012 19:12:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1328480 - in /accumulo/branches/1.4/src: core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ core/src/main/thrift/ server/src/main/java/org/apache/accumulo/server/logger/ server/src/main/java/org/apache/accumulo/server/master/... Date: Fri, 20 Apr 2012 19:12:46 -0000 To: commits@accumulo.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120420191246.D13E42388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Fri Apr 20 19:12:45 2012 New Revision: 1328480 URL: http://svn.apache.org/viewvc?rev=1328480&view=rev Log: ACCUMULO-449 Made walog recovery more responsive in the face of logger restarts Added: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Added: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java?rev=1328480&view=auto ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java (added) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java Fri Apr 20 19:12:45 2012 @@ -0,0 +1,408 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package org.apache.accumulo.core.tabletserver.thrift; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class LogCopyInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LogCopyInfo"); + + private static final org.apache.thrift.protocol.TField FILE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("fileSize", org.apache.thrift.protocol.TType.I64, (short)1); + private static final org.apache.thrift.protocol.TField LOGGER_ZNODE_FIELD_DESC = new org.apache.thrift.protocol.TField("loggerZNode", org.apache.thrift.protocol.TType.STRING, (short)2); + + public long fileSize; + public String loggerZNode; + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + FILE_SIZE((short)1, "fileSize"), + LOGGER_ZNODE((short)2, "loggerZNode"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // FILE_SIZE + return FILE_SIZE; + case 2: // LOGGER_ZNODE + return LOGGER_ZNODE; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __FILESIZE_ISSET_ID = 0; + private BitSet __isset_bit_vector = new BitSet(1); + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.FILE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("fileSize", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.LOGGER_ZNODE, new org.apache.thrift.meta_data.FieldMetaData("loggerZNode", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LogCopyInfo.class, metaDataMap); + } + + public LogCopyInfo() { + } + + public LogCopyInfo( + long fileSize, + String loggerZNode) + { + this(); + this.fileSize = fileSize; + setFileSizeIsSet(true); + this.loggerZNode = loggerZNode; + } + + /** + * Performs a deep copy on other. + */ + public LogCopyInfo(LogCopyInfo other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + this.fileSize = other.fileSize; + if (other.isSetLoggerZNode()) { + this.loggerZNode = other.loggerZNode; + } + } + + public LogCopyInfo deepCopy() { + return new LogCopyInfo(this); + } + + @Override + public void clear() { + setFileSizeIsSet(false); + this.fileSize = 0; + this.loggerZNode = null; + } + + public long getFileSize() { + return this.fileSize; + } + + public LogCopyInfo setFileSize(long fileSize) { + this.fileSize = fileSize; + setFileSizeIsSet(true); + return this; + } + + public void unsetFileSize() { + __isset_bit_vector.clear(__FILESIZE_ISSET_ID); + } + + /** Returns true if field fileSize is set (has been assigned a value) and false otherwise */ + public boolean isSetFileSize() { + return __isset_bit_vector.get(__FILESIZE_ISSET_ID); + } + + public void setFileSizeIsSet(boolean value) { + __isset_bit_vector.set(__FILESIZE_ISSET_ID, value); + } + + public String getLoggerZNode() { + return this.loggerZNode; + } + + public LogCopyInfo setLoggerZNode(String loggerZNode) { + this.loggerZNode = loggerZNode; + return this; + } + + public void unsetLoggerZNode() { + this.loggerZNode = null; + } + + /** Returns true if field loggerZNode is set (has been assigned a value) and false otherwise */ + public boolean isSetLoggerZNode() { + return this.loggerZNode != null; + } + + public void setLoggerZNodeIsSet(boolean value) { + if (!value) { + this.loggerZNode = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case FILE_SIZE: + if (value == null) { + unsetFileSize(); + } else { + setFileSize((Long)value); + } + break; + + case LOGGER_ZNODE: + if (value == null) { + unsetLoggerZNode(); + } else { + setLoggerZNode((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case FILE_SIZE: + return new Long(getFileSize()); + + case LOGGER_ZNODE: + return getLoggerZNode(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case FILE_SIZE: + return isSetFileSize(); + case LOGGER_ZNODE: + return isSetLoggerZNode(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof LogCopyInfo) + return this.equals((LogCopyInfo)that); + return false; + } + + public boolean equals(LogCopyInfo that) { + if (that == null) + return false; + + boolean this_present_fileSize = true; + boolean that_present_fileSize = true; + if (this_present_fileSize || that_present_fileSize) { + if (!(this_present_fileSize && that_present_fileSize)) + return false; + if (this.fileSize != that.fileSize) + return false; + } + + boolean this_present_loggerZNode = true && this.isSetLoggerZNode(); + boolean that_present_loggerZNode = true && that.isSetLoggerZNode(); + if (this_present_loggerZNode || that_present_loggerZNode) { + if (!(this_present_loggerZNode && that_present_loggerZNode)) + return false; + if (!this.loggerZNode.equals(that.loggerZNode)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(LogCopyInfo other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + LogCopyInfo typedOther = (LogCopyInfo)other; + + lastComparison = Boolean.valueOf(isSetFileSize()).compareTo(typedOther.isSetFileSize()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFileSize()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fileSize, typedOther.fileSize); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetLoggerZNode()).compareTo(typedOther.isSetLoggerZNode()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetLoggerZNode()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.loggerZNode, typedOther.loggerZNode); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // FILE_SIZE + if (field.type == org.apache.thrift.protocol.TType.I64) { + this.fileSize = iprot.readI64(); + setFileSizeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // LOGGER_ZNODE + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.loggerZNode = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(FILE_SIZE_FIELD_DESC); + oprot.writeI64(this.fileSize); + oprot.writeFieldEnd(); + if (this.loggerZNode != null) { + oprot.writeFieldBegin(LOGGER_ZNODE_FIELD_DESC); + oprot.writeString(this.loggerZNode); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("LogCopyInfo("); + boolean first = true; + + sb.append("fileSize:"); + sb.append(this.fileSize); + first = false; + if (!first) sb.append(", "); + sb.append("loggerZNode:"); + if (this.loggerZNode == null) { + sb.append("null"); + } else { + sb.append(this.loggerZNode); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} + Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java Fri Apr 20 19:12:45 2012 @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; public void close(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, long id) throws NoSuchLogIDException, LoggerClosedException, org.apache.thrift.TException; - public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException; + public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException; public List getClosedLogs(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo credentials) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException; @@ -412,7 +412,7 @@ import org.slf4j.LoggerFactory; return; } - public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException + public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException { send_startCopy(tinfo, credentials, name, fullyQualifiedFileName, sort); return recv_startCopy(); @@ -432,7 +432,7 @@ import org.slf4j.LoggerFactory; oprot_.getTransport().flush(); } - public long recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException + public LogCopyInfo recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException { org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin(); if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) { @@ -904,7 +904,7 @@ import org.slf4j.LoggerFactory; prot.writeMessageEnd(); } - public long getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException { + public LogCopyInfo getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException, org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new IllegalStateException("Method call not finished!"); } @@ -1400,7 +1400,6 @@ import org.slf4j.LoggerFactory; startCopy_result result = new startCopy_result(); try { result.success = iface_.startCopy(args.tinfo, args.credentials, args.name, args.fullyQualifiedFileName, args.sort); - result.setSuccessIsSet(true); } catch (org.apache.accumulo.core.security.thrift.ThriftSecurityException sec) { result.sec = sec; } catch (Throwable th) { @@ -8945,10 +8944,10 @@ import org.slf4j.LoggerFactory; public static class startCopy_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("startCopy_result"); - private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.I64, (short)0); + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRUCT, (short)0); private static final org.apache.thrift.protocol.TField SEC_FIELD_DESC = new org.apache.thrift.protocol.TField("sec", org.apache.thrift.protocol.TType.STRUCT, (short)1); - public long success; + public LogCopyInfo success; public org.apache.accumulo.core.security.thrift.ThriftSecurityException sec; /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -9013,14 +9012,12 @@ import org.slf4j.LoggerFactory; } // isset id assignments - private static final int __SUCCESS_ISSET_ID = 0; - private BitSet __isset_bit_vector = new BitSet(1); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, LogCopyInfo.class))); tmpMap.put(_Fields.SEC, new org.apache.thrift.meta_data.FieldMetaData("sec", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT))); metaDataMap = Collections.unmodifiableMap(tmpMap); @@ -9031,12 +9028,11 @@ import org.slf4j.LoggerFactory; } public startCopy_result( - long success, + LogCopyInfo success, org.apache.accumulo.core.security.thrift.ThriftSecurityException sec) { this(); this.success = success; - setSuccessIsSet(true); this.sec = sec; } @@ -9044,9 +9040,9 @@ import org.slf4j.LoggerFactory; * Performs a deep copy on other. */ public startCopy_result(startCopy_result other) { - __isset_bit_vector.clear(); - __isset_bit_vector.or(other.__isset_bit_vector); - this.success = other.success; + if (other.isSetSuccess()) { + this.success = new LogCopyInfo(other.success); + } if (other.isSetSec()) { this.sec = new org.apache.accumulo.core.security.thrift.ThriftSecurityException(other.sec); } @@ -9058,32 +9054,32 @@ import org.slf4j.LoggerFactory; @Override public void clear() { - setSuccessIsSet(false); - this.success = 0; + this.success = null; this.sec = null; } - public long getSuccess() { + public LogCopyInfo getSuccess() { return this.success; } - public startCopy_result setSuccess(long success) { + public startCopy_result setSuccess(LogCopyInfo success) { this.success = success; - setSuccessIsSet(true); return this; } public void unsetSuccess() { - __isset_bit_vector.clear(__SUCCESS_ISSET_ID); + this.success = null; } /** Returns true if field success is set (has been assigned a value) and false otherwise */ public boolean isSetSuccess() { - return __isset_bit_vector.get(__SUCCESS_ISSET_ID); + return this.success != null; } public void setSuccessIsSet(boolean value) { - __isset_bit_vector.set(__SUCCESS_ISSET_ID, value); + if (!value) { + this.success = null; + } } public org.apache.accumulo.core.security.thrift.ThriftSecurityException getSec() { @@ -9116,7 +9112,7 @@ import org.slf4j.LoggerFactory; if (value == null) { unsetSuccess(); } else { - setSuccess((Long)value); + setSuccess((LogCopyInfo)value); } break; @@ -9134,7 +9130,7 @@ import org.slf4j.LoggerFactory; public Object getFieldValue(_Fields field) { switch (field) { case SUCCESS: - return new Long(getSuccess()); + return getSuccess(); case SEC: return getSec(); @@ -9171,12 +9167,12 @@ import org.slf4j.LoggerFactory; if (that == null) return false; - boolean this_present_success = true; - boolean that_present_success = true; + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); if (this_present_success || that_present_success) { if (!(this_present_success && that_present_success)) return false; - if (this.success != that.success) + if (!this.success.equals(that.success)) return false; } @@ -9243,9 +9239,9 @@ import org.slf4j.LoggerFactory; } switch (field.id) { case 0: // SUCCESS - if (field.type == org.apache.thrift.protocol.TType.I64) { - this.success = iprot.readI64(); - setSuccessIsSet(true); + if (field.type == org.apache.thrift.protocol.TType.STRUCT) { + this.success = new LogCopyInfo(); + this.success.read(iprot); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -9274,7 +9270,7 @@ import org.slf4j.LoggerFactory; if (this.isSetSuccess()) { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); - oprot.writeI64(this.success); + this.success.write(oprot); oprot.writeFieldEnd(); } else if (this.isSetSec()) { oprot.writeFieldBegin(SEC_FIELD_DESC); @@ -9291,7 +9287,11 @@ import org.slf4j.LoggerFactory; boolean first = true; sb.append("success:"); - sb.append(this.success); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } first = false; if (!first) sb.append(", "); sb.append("sec:"); Modified: accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift (original) +++ accumulo/branches/1.4/src/core/src/main/thrift/tabletserver.thrift Fri Apr 20 19:12:45 2012 @@ -169,6 +169,10 @@ struct TabletMutations { 3:list mutations } +struct LogCopyInfo { + 1:i64 fileSize, + 2:string loggerZNode +} service MutationLogger { LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession) throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce), @@ -182,7 +186,7 @@ service MutationLogger { void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce), // close a log file and initiate the push to HDFS - i64 startCopy(4:cloudtrace.TInfo tinfo, + LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string name, 3:string fullyQualifiedFileName, Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java (original) +++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogService.java Fri Apr 20 19:12:45 2012 @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import org.apache.accumulo.cloudtrace.instrument.thrift.TraceWrap; import org.apache.accumulo.cloudtrace.thrift.TInfo; @@ -45,6 +46,7 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo; import org.apache.accumulo.core.tabletserver.thrift.LogFile; import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException; import org.apache.accumulo.core.tabletserver.thrift.MutationLogger; @@ -55,6 +57,7 @@ import org.apache.accumulo.core.util.Cac import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.logger.LogWriter.LogWriteException; @@ -69,6 +72,7 @@ import org.apache.accumulo.server.zookee import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -78,7 +82,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; - /** * A Mutation logging service. * @@ -94,6 +97,7 @@ public class LogService implements Mutat private TServer service; private LogWriter writer_; private MutationLogger.Iface writer; + private String ephemeralNode; enum ShutdownState { STARTED, REGISTERED, WAITING_FOR_HALT, HALT @@ -189,6 +193,10 @@ public class LogService implements Mutat boolean archive = ServerConfiguration.getSystemConfiguration().getBoolean(Property.LOGGER_ARCHIVE); AccumuloConfiguration acuConf = ServerConfiguration.getSystemConfiguration(); writer_ = new LogWriter(acuConf, fs, rootDirs, HdfsZooInstance.getInstance().getInstanceID(), poolSize, archive); + + // call before putting this service online + removeIncompleteCopies(acuConf, fs, rootDirs); + InvocationHandler h = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -232,6 +240,40 @@ public class LogService implements Mutat Accumulo.enableTracing(address.getHostName(), "logger"); } + /** + * @param acuConf + * @param fs + * @param rootDirs + * @throws IOException + */ + private void removeIncompleteCopies(AccumuloConfiguration acuConf, FileSystem fs, Set rootDirs) throws IOException { + if (acuConf.getBoolean(Property.MASTER_RECOVERY_SORT_MAPREDUCE)) { + return; + } + + Set walogs = new HashSet(); + for (String root : rootDirs) { + File rootFile = new File(root); + for (File walog : rootFile.listFiles()) { + try { + UUID.fromString(walog.getName()); + walogs.add(walog.getName()); + } catch (IllegalArgumentException iea) { + LOG.debug("Ignoring " + walog.getName()); + } + } + } + + // look for .recovered that are not finished + for (String walog : walogs) { + Path path = new Path(ServerConstants.getRecoveryDir() + "/" + walog + ".recovered"); + if (fs.exists(path) && !fs.exists(new Path(path, "finished"))) { + LOG.debug("Incomplete copy/sort in dfs, deleting " + path); + fs.delete(path, true); + } + } + } + public void run() { try { while (!service.isServing()) { @@ -255,6 +297,7 @@ public class LogService implements Mutat String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + zooDir; path += "/logger-"; path = zoo.putEphemeralSequential(path, addressString.getBytes()); + ephemeralNode = path; zoo.exists(path, this); } catch (Exception ex) { throw new RuntimeException("Unexpected error creating zookeeper entry " + zooDir); @@ -280,10 +323,12 @@ public class LogService implements Mutat } @Override - public long startCopy(TInfo info, AuthInfo credentials, String localLog, String fullyQualifiedFileName, boolean sort) throws ThriftSecurityException, + public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, String localLog, String fullyQualifiedFileName, boolean sort) throws ThriftSecurityException, TException { checkForSystemPrivs("copy", credentials); - return writer.startCopy(null, credentials, localLog, fullyQualifiedFileName, sort); + LogCopyInfo lci = writer.startCopy(null, credentials, localLog, fullyQualifiedFileName, sort); + lci.loggerZNode = ephemeralNode; + return lci; } @Override Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java (original) +++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java Fri Apr 20 19:12:45 2012 @@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.thr import org.apache.accumulo.core.data.thrift.TMutation; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo; import org.apache.accumulo.core.tabletserver.thrift.LogFile; import org.apache.accumulo.core.tabletserver.thrift.MutationLogger; import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException; @@ -234,7 +235,7 @@ class LogWriter implements MutationLogge } @Override - public long startCopy(TInfo info, AuthInfo credentials, final String localLog, final String fullyQualifiedFileName, final boolean sort) { + public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, final String localLog, final String fullyQualifiedFileName, final boolean sort) { log.info("Copying " + localLog + " to " + fullyQualifiedFileName); final long t1 = System.currentTimeMillis(); try { @@ -314,6 +315,7 @@ class LogWriter implements MutationLogge memorySize = 0; } } + if (!kv.isEmpty()) writeSortedEntries(dest, part++, kv); fs.create(new Path(dest, "finished")).close(); @@ -380,7 +382,7 @@ class LogWriter implements MutationLogge log.info("Copying " + localLog + " complete"); } }); - return result; + return new LogCopyInfo(result, null); } @Override Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java (original) +++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java Fri Apr 20 19:12:45 2012 @@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.file.FileUtil; import org.apache.accumulo.core.master.thrift.RecoveryStatus; import org.apache.accumulo.core.security.thrift.AuthInfo; +import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -39,6 +40,7 @@ import org.apache.accumulo.server.client import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.tabletserver.log.RemoteLogger; import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; @@ -58,6 +60,8 @@ public class CoordinateRecoveryTask impl private boolean stop = false; + private ZooCache zcache; + private static String fullName(String name) { return ServerConstants.getRecoveryDir() + "/" + name; } @@ -118,6 +122,7 @@ public class CoordinateRecoveryTask impl Job sortJob = null; boolean useMapReduce = ServerConfiguration.getSystemConfiguration().getBoolean(Property.MASTER_RECOVERY_SORT_MAPREDUCE); JobComplete notify = null; + String loggerZNode; RecoveryJob(LogFile entry, JobComplete callback) throws Exception { logFile = entry; @@ -132,7 +137,9 @@ public class CoordinateRecoveryTask impl RemoteLogger logger = new RemoteLogger(logFile.server); String base = logFile.unsortedFileName(); log.debug("Starting to copy " + logFile.file + " from " + logFile.server); - copySize = logger.startCopy(logFile.file, base, !useMapReduce); + LogCopyInfo lci = logger.startCopy(logFile.file, base, !useMapReduce); + copySize = lci.fileSize; + loggerZNode = lci.loggerZNode; } catch (Throwable t) { log.warn("Unable to recover " + logFile + "(" + t + ")", t); fail(); @@ -162,6 +169,11 @@ public class CoordinateRecoveryTask impl return true; } + if (zcache.get(loggerZNode) == null) { + log.debug("zknode " + loggerZNode + " is gone, copy " + logFile.file + " from " + logFile.server + " assumed dead"); + return true; + } + if (elapsedMillis() > ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_MAXTIME)) { log.warn("Recovery taking too long, giving up"); if (sortJob != null) @@ -249,6 +261,7 @@ public class CoordinateRecoveryTask impl public CoordinateRecoveryTask(FileSystem fs) { this.fs = fs; + zcache = new ZooCache(); } public boolean recover(AuthInfo credentials, KeyExtent extent, Collection> entries, JobComplete notify) { Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1328480&r1=1328479&r2=1328480&view=diff ============================================================================== --- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java (original) +++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Fri Apr 20 19:12:45 2012 @@ -24,6 +24,7 @@ import org.apache.accumulo.core.conf.Pro import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo; import org.apache.accumulo.core.tabletserver.thrift.LogFile; import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException; import org.apache.accumulo.core.tabletserver.thrift.MutationLogger; @@ -152,7 +153,7 @@ public class RemoteLogger { client.minorCompactionStarted(null, logFile.id, seq, tid, fqfn); } - public synchronized long startCopy(String name, String fullyQualifiedFileName, boolean sort) throws ThriftSecurityException, TException { + public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName, boolean sort) throws ThriftSecurityException, TException { return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName, sort); }