accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1328486 - in /accumulo/trunk: ./ core/ core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ core/src/main/thrift/ docs/examples/ docs/src/user_manual/chapters/ server/ server/src/main/java/org/apache/accumulo/server/logger/ ser...
Date Fri, 20 Apr 2012 19:27:56 GMT
Author: kturner
Date: Fri Apr 20 19:27:55 2012
New Revision: 1328486

URL: http://svn.apache.org/viewvc?rev=1328486&view=rev
Log:
ACCUMULO-449 ACCUMULO- merged from 1.4

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
      - copied unchanged from r1328481, accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/LogCopyInfo.java
Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
    accumulo/trunk/core/src/main/thrift/tabletserver.thrift
    accumulo/trunk/docs/examples/README
    accumulo/trunk/docs/src/user_manual/chapters/table_design.tex
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1328082-1328481
  Merged /accumulo/branches/1.4:r1327193-1328079,1328081-1328481

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1327193-1328079,1328081-1328481
  Merged /accumulo/branches/1.4/src/core:r1328082-1328481

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
Fri Apr 20 19:27:55 2012
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
     public void close(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, long id) throws
NoSuchLogIDException, LoggerClosedException, org.apache.thrift.TException;
 
-    public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
+    public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
     public List<String> getClosedLogs(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo,
org.apache.accumulo.core.security.thrift.AuthInfo credentials) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException;
 
@@ -412,7 +412,7 @@ import org.slf4j.LoggerFactory;
       return;
     }
 
-    public long startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
+    public LogCopyInfo startCopy(org.apache.accumulo.cloudtrace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.AuthInfo
credentials, String name, String fullyQualifiedFileName, boolean sort) throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
     {
       send_startCopy(tinfo, credentials, name, fullyQualifiedFileName, sort);
       return recv_startCopy();
@@ -432,7 +432,7 @@ import org.slf4j.LoggerFactory;
       oprot_.getTransport().flush();
     }
 
-    public long recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
+    public LogCopyInfo recv_startCopy() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException
     {
       org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
       if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
@@ -904,7 +904,7 @@ import org.slf4j.LoggerFactory;
         prot.writeMessageEnd();
       }
 
-      public long getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException {
+      public LogCopyInfo getResult() throws org.apache.accumulo.core.security.thrift.ThriftSecurityException,
org.apache.thrift.TException {
         if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
           throw new IllegalStateException("Method call not finished!");
         }
@@ -1400,7 +1400,6 @@ import org.slf4j.LoggerFactory;
         startCopy_result result = new startCopy_result();
         try {
           result.success = iface_.startCopy(args.tinfo, args.credentials, args.name, args.fullyQualifiedFileName,
args.sort);
-          result.setSuccessIsSet(true);
         } catch (org.apache.accumulo.core.security.thrift.ThriftSecurityException sec) {
           result.sec = sec;
         } catch (Throwable th) {
@@ -8945,10 +8944,10 @@ import org.slf4j.LoggerFactory;
   public static class startCopy_result implements org.apache.thrift.TBase<startCopy_result,
startCopy_result._Fields>, java.io.Serializable, Cloneable   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("startCopy_result");
 
-    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success",
org.apache.thrift.protocol.TType.I64, (short)0);
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success",
org.apache.thrift.protocol.TType.STRUCT, (short)0);
     private static final org.apache.thrift.protocol.TField SEC_FIELD_DESC = new org.apache.thrift.protocol.TField("sec",
org.apache.thrift.protocol.TType.STRUCT, (short)1);
 
-    public long success;
+    public LogCopyInfo success;
     public org.apache.accumulo.core.security.thrift.ThriftSecurityException sec;
 
     /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
@@ -9013,14 +9012,12 @@ import org.slf4j.LoggerFactory;
     }
 
     // isset id assignments
-    private static final int __SUCCESS_ISSET_ID = 0;
-    private BitSet __isset_bit_vector = new BitSet(1);
 
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
       tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
LogCopyInfo.class)));
       tmpMap.put(_Fields.SEC, new org.apache.thrift.meta_data.FieldMetaData("sec", org.apache.thrift.TFieldRequirementType.DEFAULT,

           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
@@ -9031,12 +9028,11 @@ import org.slf4j.LoggerFactory;
     }
 
     public startCopy_result(
-      long success,
+      LogCopyInfo success,
       org.apache.accumulo.core.security.thrift.ThriftSecurityException sec)
     {
       this();
       this.success = success;
-      setSuccessIsSet(true);
       this.sec = sec;
     }
 
@@ -9044,9 +9040,9 @@ import org.slf4j.LoggerFactory;
      * Performs a deep copy on <i>other</i>.
      */
     public startCopy_result(startCopy_result other) {
-      __isset_bit_vector.clear();
-      __isset_bit_vector.or(other.__isset_bit_vector);
-      this.success = other.success;
+      if (other.isSetSuccess()) {
+        this.success = new LogCopyInfo(other.success);
+      }
       if (other.isSetSec()) {
         this.sec = new org.apache.accumulo.core.security.thrift.ThriftSecurityException(other.sec);
       }
@@ -9058,32 +9054,32 @@ import org.slf4j.LoggerFactory;
 
     @Override
     public void clear() {
-      setSuccessIsSet(false);
-      this.success = 0;
+      this.success = null;
       this.sec = null;
     }
 
-    public long getSuccess() {
+    public LogCopyInfo getSuccess() {
       return this.success;
     }
 
-    public startCopy_result setSuccess(long success) {
+    public startCopy_result setSuccess(LogCopyInfo success) {
       this.success = success;
-      setSuccessIsSet(true);
       return this;
     }
 
     public void unsetSuccess() {
-      __isset_bit_vector.clear(__SUCCESS_ISSET_ID);
+      this.success = null;
     }
 
     /** Returns true if field success is set (has been assigned a value) and false otherwise
*/
     public boolean isSetSuccess() {
-      return __isset_bit_vector.get(__SUCCESS_ISSET_ID);
+      return this.success != null;
     }
 
     public void setSuccessIsSet(boolean value) {
-      __isset_bit_vector.set(__SUCCESS_ISSET_ID, value);
+      if (!value) {
+        this.success = null;
+      }
     }
 
     public org.apache.accumulo.core.security.thrift.ThriftSecurityException getSec() {
@@ -9116,7 +9112,7 @@ import org.slf4j.LoggerFactory;
         if (value == null) {
           unsetSuccess();
         } else {
-          setSuccess((Long)value);
+          setSuccess((LogCopyInfo)value);
         }
         break;
 
@@ -9134,7 +9130,7 @@ import org.slf4j.LoggerFactory;
     public Object getFieldValue(_Fields field) {
       switch (field) {
       case SUCCESS:
-        return new Long(getSuccess());
+        return getSuccess();
 
       case SEC:
         return getSec();
@@ -9171,12 +9167,12 @@ import org.slf4j.LoggerFactory;
       if (that == null)
         return false;
 
-      boolean this_present_success = true;
-      boolean that_present_success = true;
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
       if (this_present_success || that_present_success) {
         if (!(this_present_success && that_present_success))
           return false;
-        if (this.success != that.success)
+        if (!this.success.equals(that.success))
           return false;
       }
 
@@ -9243,9 +9239,9 @@ import org.slf4j.LoggerFactory;
         }
         switch (field.id) {
           case 0: // SUCCESS
-            if (field.type == org.apache.thrift.protocol.TType.I64) {
-              this.success = iprot.readI64();
-              setSuccessIsSet(true);
+            if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+              this.success = new LogCopyInfo();
+              this.success.read(iprot);
             } else { 
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
             }
@@ -9274,7 +9270,7 @@ import org.slf4j.LoggerFactory;
 
       if (this.isSetSuccess()) {
         oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
-        oprot.writeI64(this.success);
+        this.success.write(oprot);
         oprot.writeFieldEnd();
       } else if (this.isSetSec()) {
         oprot.writeFieldBegin(SEC_FIELD_DESC);
@@ -9291,7 +9287,11 @@ import org.slf4j.LoggerFactory;
       boolean first = true;
 
       sb.append("success:");
-      sb.append(this.success);
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
       first = false;
       if (!first) sb.append(", ");
       sb.append("sec:");

Modified: accumulo/trunk/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/tabletserver.thrift?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/trunk/core/src/main/thrift/tabletserver.thrift Fri Apr 20 19:27:55 2012
@@ -169,6 +169,10 @@ struct TabletMutations {
 	3:list<data.TMutation> mutations
 }
 
+struct LogCopyInfo {
+	1:i64 fileSize,
+	2:string loggerZNode
+}
 
 service MutationLogger {
     LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession)
throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce),
@@ -182,7 +186,7 @@ service MutationLogger {
     void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli,
2:LoggerClosedException lce),
 
     // close a log file and initiate the push to HDFS
-    i64 startCopy(4:cloudtrace.TInfo tinfo,
+    LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo,
                   1:security.AuthInfo credentials,
                   2:string name, 
                   3:string fullyQualifiedFileName,

Modified: accumulo/trunk/docs/examples/README
URL: http://svn.apache.org/viewvc/accumulo/trunk/docs/examples/README?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/docs/examples/README (original)
+++ accumulo/trunk/docs/examples/README Fri Apr 20 19:27:55 2012
@@ -34,4 +34,4 @@ Commands intended to be run in bash are 
 
 Commands intended to be run in the Accumulo shell are prefixed by '>'.
 
-[1]: /accumulo/user_manual_1.5/Accumulo_Shell.html#User_Administration
+[1]: /1.5/user_manual/Accumulo_Shell.html#User_Administration

Modified: accumulo/trunk/docs/src/user_manual/chapters/table_design.tex
URL: http://svn.apache.org/viewvc/accumulo/trunk/docs/src/user_manual/chapters/table_design.tex?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/docs/src/user_manual/chapters/table_design.tex (original)
+++ accumulo/trunk/docs/src/user_manual/chapters/table_design.tex Fri Apr 20 19:27:55 2012
@@ -158,7 +158,7 @@ indexScanner.setRange(new Range(term, te
 
 // we retrieve the matching rowIDs and create a set of ranges
 for(Entry<Key,Value> entry : indexScanner)
-    matchingRows.add(new Text(entry.getValue()));
+    matchingRows.add(new Text(entry.getKey().getColumnQualifier()));
 
 // now we pass the set of rowIDs to the batch scanner to retrieve them
 BatchScanner bscan = conn.createBatchScanner("table", auths, 10);
@@ -167,7 +167,7 @@ bscan.setRanges(matchingRows);
 bscan.fetchFamily("attributes");
 
 for(Entry<Key,Value> entry : scan)
-    System.out.println(e.getValue());
+    System.out.println(entry.getValue());
 \end{verbatim}
 \normalsize
 

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1327193-1328079,1328081-1328481
  Merged /accumulo/branches/1.4/src/server:r1328082-1328481

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
Fri Apr 20 19:27:55 2012
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.accumulo.cloudtrace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
@@ -46,6 +47,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
@@ -56,6 +58,7 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.logger.LogWriter.LogWriteException;
@@ -69,6 +72,7 @@ import org.apache.accumulo.server.util.T
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
@@ -78,7 +82,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
-
 /**
  * A Mutation logging service.
  * 
@@ -97,6 +100,7 @@ public class LogService implements Mutat
   private ShutdownState shutdownState = ShutdownState.STARTED;
   private final List<FileLock> fileLocks = new ArrayList<FileLock>();
   private final String addressString;
+  private String ephemeralNode;
   
   enum ShutdownState {
     STARTED, REGISTERED, WAITING_FOR_HALT, HALT
@@ -179,6 +183,10 @@ public class LogService implements Mutat
     int poolSize = acuConf.getCount(Property.LOGGER_COPY_THREADPOOL_SIZE);
     boolean archive = acuConf.getBoolean(Property.LOGGER_ARCHIVE);
     writer_ = new LogWriter(acuConf, fs, rootDirs, instance.getInstanceID(), poolSize, archive);
+    
+    // call before putting this service online
+    removeIncompleteCopies(acuConf, fs, rootDirs);
+
     InvocationHandler h = new InvocationHandler() {
       @Override
       public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
@@ -221,6 +229,37 @@ public class LogService implements Mutat
     this.switchState(ShutdownState.REGISTERED);
   }
   
+  /**
+   * @param acuConf
+   * @param fs
+   * @param rootDirs
+   * @throws IOException
+   */
+  private void removeIncompleteCopies(AccumuloConfiguration acuConf, FileSystem fs, Set<String>
rootDirs) throws IOException {
+
+    Set<String> walogs = new HashSet<String>();
+    for (String root : rootDirs) {
+      File rootFile = new File(root);
+      for (File walog : rootFile.listFiles()) {
+        try {
+          UUID.fromString(walog.getName());
+          walogs.add(walog.getName());
+        } catch (IllegalArgumentException iea) {
+          LOG.debug("Ignoring " + walog.getName());
+        }
+      }
+    }
+    
+    // look for .recovered that are not finished
+    for (String walog : walogs) {
+      Path path = new Path(ServerConstants.getRecoveryDir() + "/" + walog + ".recovered");
+      if (fs.exists(path) && !fs.exists(new Path(path, "finished"))) {
+        LOG.debug("Incomplete copy/sort in dfs, deleting " + path);
+        fs.delete(path, true);
+      }
+    }
+  }
+
   public void run() {
     try {
       while (!service.isServing()) {
@@ -244,6 +283,7 @@ public class LogService implements Mutat
       String path = ZooUtil.getRoot(instance) + zooDir;
       path += "/logger-";
       path = zoo.putEphemeralSequential(path, addressString.getBytes());
+      ephemeralNode = path;
       zoo.exists(path, this);
     } catch (Exception ex) {
       throw new RuntimeException("Unexpected error creating zookeeper entry " + zooDir);
@@ -269,10 +309,12 @@ public class LogService implements Mutat
   }
   
   @Override
-  public long startCopy(TInfo info, AuthInfo credentials, String localLog, String fullyQualifiedFileName,
boolean sort) throws ThriftSecurityException,
+  public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, String localLog, String
fullyQualifiedFileName, boolean sort) throws ThriftSecurityException,
       TException {
     checkForSystemPrivs("copy", credentials);
-    return writer.startCopy(null, credentials, localLog, fullyQualifiedFileName, sort);
+    LogCopyInfo lci = writer.startCopy(null, credentials, localLog, fullyQualifiedFileName,
sort);
+    lci.loggerZNode = ephemeralNode;
+    return lci;
   }
   
   @Override

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogWriter.java Fri
Apr 20 19:27:55 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
@@ -234,7 +235,7 @@ class LogWriter implements MutationLogge
   }
   
   @Override
-  public long startCopy(TInfo info, AuthInfo credentials, final String localLog, final String
fullyQualifiedFileName, final boolean sort) {
+  public LogCopyInfo startCopy(TInfo info, AuthInfo credentials, final String localLog, final
String fullyQualifiedFileName, final boolean sort) {
     log.info("Copying " + localLog + " to " + fullyQualifiedFileName);
     final long t1 = System.currentTimeMillis();
     try {
@@ -314,6 +315,7 @@ class LogWriter implements MutationLogge
               memorySize = 0;
             }
           }
+
           if (!kv.isEmpty())
             writeSortedEntries(dest, part++, kv);
           fs.create(new Path(dest, "finished")).close();
@@ -380,7 +382,7 @@ class LogWriter implements MutationLogge
         log.info("Copying " + localLog + " complete");
       }
     });
-    return result;
+    return new LogCopyInfo(result, null);
   }
   
   @Override

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
Fri Apr 20 19:27:55 2012
@@ -29,9 +29,15 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +52,8 @@ public class CoordinateRecoveryTask impl
   
   private boolean stop = false;
   
+  private ZooCache zcache;
+  
   private static String fullName(String name) {
     return ServerConstants.getRecoveryDir() + "/" + name;
   }
@@ -105,6 +113,7 @@ public class CoordinateRecoveryTask impl
     long copySize = 0;
     JobComplete notify = null;
     final AccumuloConfiguration config;
+    String loggerZNode;
     
     RecoveryJob(LogFile entry, JobComplete callback, AccumuloConfiguration conf) throws Exception
{
       logFile = entry;
@@ -120,7 +129,9 @@ public class CoordinateRecoveryTask impl
         RemoteLogger logger = new RemoteLogger(logFile.server, config);
         String base = logFile.unsortedFileName();
         log.debug("Starting to copy " + logFile.file + " from " + logFile.server);
-        copySize = logger.startCopy(logFile.file, base);
+        LogCopyInfo lci = logger.startCopy(logFile.file, base);
+        copySize = lci.fileSize;
+        loggerZNode = lci.loggerZNode;
       } catch (Throwable t) {
         log.warn("Unable to recover " + logFile + "(" + t + ")", t);
         fail();
@@ -136,6 +147,11 @@ public class CoordinateRecoveryTask impl
         return true;
       }
       
+      if (zcache.get(loggerZNode) == null) {
+        log.debug("zknode " + loggerZNode + " is gone, copy " + logFile.file + " from " +
logFile.server + " assumed dead");
+        return true;
+      }
+
       if (elapsedMillis() > config.getTimeInMillis(Property.MASTER_RECOVERY_MAXTIME))
{
         log.warn("Recovery taking too long, giving up");
         return true;
@@ -196,6 +212,7 @@ public class CoordinateRecoveryTask impl
   public CoordinateRecoveryTask(FileSystem fs, AccumuloConfiguration conf) {
     this.fs = fs;
     this.config = conf;
+    zcache = new ZooCache();
   }
   
   public boolean recover(AuthInfo credentials, KeyExtent extent, Collection<Collection<String>>
entries, JobComplete notify) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1328486&r1=1328485&r2=1328486&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
Fri Apr 20 19:27:55 2012
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.LogCopyInfo;
 import org.apache.accumulo.core.tabletserver.thrift.LogFile;
 import org.apache.accumulo.core.tabletserver.thrift.LoggerClosedException;
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
@@ -277,7 +278,7 @@ public class RemoteLogger {
     client.minorCompactionStarted(null, logFile.id, seq, tid, fqfn);
   }
   
-  public synchronized long startCopy(String name, String fullyQualifiedFileName) throws ThriftSecurityException,
TException {
+  public synchronized LogCopyInfo startCopy(String name, String fullyQualifiedFileName) throws
ThriftSecurityException, TException {
     return client.startCopy(null, SecurityConstants.getSystemCredentials(), name, fullyQualifiedFileName,
true);
   }
   



Mime
View raw message