accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1486202 [2/2] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/ server/src/main/java/org/apache/accumulo/server/fs/ server/src/main/java/org/apache/accu...
Date Fri, 24 May 2013 19:59:11 GMT
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
Fri May 24 19:59:11 2013
@@ -19,10 +19,9 @@ package org.apache.accumulo.server.table
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.commons.collections.buffer.PriorityBuffer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -87,7 +86,7 @@ public class MultiReader {
   
   private PriorityBuffer heap = new PriorityBuffer();
   
-  public MultiReader(FileSystem fs, Configuration conf, String directory) throws IOException
{
+  public MultiReader(FileSystem fs, String directory) throws IOException {
     boolean foundFinish = false;
     for (FileStatus child : fs.listStatus(new Path(directory))) {
       if (child.getPath().getName().startsWith("_"))
@@ -96,7 +95,8 @@ public class MultiReader {
         foundFinish = true;
         continue;
       }
-      heap.add(new Index(new Reader(fs, child.getPath().toString(), conf)));
+      org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
       throw new IOException("Sort \"finished\" flag not found in " + directory);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
Fri May 24 19:59:11 2013
@@ -29,14 +29,9 @@ import java.util.Set;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
 /**
@@ -58,7 +53,11 @@ public class SortedLogRecovery {
     public UnusedException() { super(); }
   }
 
-  public SortedLogRecovery() {}
+  private FileSystem fs;
+
+  public SortedLogRecovery(FileSystem fs) {
+    this.fs = fs;
+  }
   
   private enum Status {
     INITIAL, LOOKING_FOR_FINISH, COMPLETE
@@ -90,14 +89,12 @@ public class SortedLogRecovery {
   }
   
   public void recover(KeyExtent extent, List<String> recoveryLogs, Set<String>
tabletFiles, MutationReceiver mr) throws IOException {
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
     int[] tids = new int[recoveryLogs.size()];
     LastStartToFinish lastStartToFinish = new LastStartToFinish();
     for (int i = 0; i < recoveryLogs.size(); i++) {
       String logfile = recoveryLogs.get(i);
       log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         try {
           tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish);
@@ -123,7 +120,7 @@ public class SortedLogRecovery {
     
     for (int i = 0; i < recoveryLogs.size(); i++) {
       String logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         playbackMutations(reader, tids[i], lastStartToFinish, mr);
       } finally {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
Fri May 24 19:59:11 2013
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.tabletserver.Tablet;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
@@ -413,11 +414,11 @@ public class TabletServerLogger {
     return seq;
   }
   
-  public void recover(Tablet tablet, List<String> logs, Set<String> tabletFiles,
MutationReceiver mr) throws IOException {
+  public void recover(FileSystem fs, Tablet tablet, List<String> logs, Set<String>
tabletFiles, MutationReceiver mr) throws IOException {
     if (!enabled(tablet))
       return;
     try {
-      SortedLogRecovery recovery = new SortedLogRecovery();
+      SortedLogRecovery recovery = new SortedLogRecovery(fs);
       KeyExtent extent = tablet.getExtent();
       recovery.recover(extent, logs, tabletFiles, mr);
     } catch (Exception e) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
Fri May 24 19:59:11 2013
@@ -36,17 +36,17 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.trace.TraceFormatter;
 import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -54,7 +54,6 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.thrift.RemoteSpan;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TByteArrayOutputStream;
@@ -254,7 +253,7 @@ public class TraceServer implements Watc
     SecurityUtil.serverLogin();
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration conf = new ServerConfiguration(instance);
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
+    FileSystem fs = FileSystemImpl.get();
     Accumulo.init(fs, conf, "tracer");
     String hostname = Accumulo.getLocalAddress(args);
     TraceServer server = new TraceServer(conf, hostname);

Modified: accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
Fri May 24 19:59:11 2013
@@ -34,8 +34,8 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
@@ -93,7 +93,7 @@ public class TestConfirmDeletes {
     TCredentials auth = CredentialHelper.create("root", new PasswordToken(new byte[0]), "instance");
     
     Instance instance = new MockInstance();
-    FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+    FileSystem fs = FileSystemImpl.getLocal();
     
     load(instance, metadata, deletes);
     

Modified: accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
Fri May 24 19:59:11 2013
@@ -16,14 +16,14 @@
  */
 package org.apache.accumulo.server.tabletserver.log;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -33,27 +33,31 @@ import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class MultiReaderTest {
   
-  Configuration conf = CachedConfiguration.getInstance();
   FileSystem fs;
+  TemporaryFolder root = new TemporaryFolder();
   
   @Before
   public void setUp() throws Exception {
     // quiet log messages about compress.CodecPool
     Logger.getRootLogger().setLevel(Level.ERROR);
-    fs = FileSystem.getLocal(conf);
-    Path root = new Path("manyMaps");
+    fs = FileSystemImpl.getLocal();
+    root.create();
+    String path = root.getRoot().getAbsolutePath();
+    Path root = new Path("file://" + path + "/manyMaps");
     fs.mkdirs(root);
     fs.create(new Path(root, "finished")).close();
-    Writer writer = new Writer(conf, fs, "manyMaps/odd", IntWritable.class, BytesWritable.class);
+    org.apache.hadoop.fs.FileSystem ns = fs.getDefaultNamespace();
+    Writer writer = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class,
BytesWritable.class);
     BytesWritable value = new BytesWritable("someValue".getBytes());
     for (int i = 1; i < 1000; i += 2) {
       writer.append(new IntWritable(i), value);
     }
     writer.close();
-    writer = new Writer(conf, fs, "manyMaps/even", IntWritable.class, BytesWritable.class);
+    writer = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class,
BytesWritable.class);
     for (int i = 0; i < 1000; i += 2) {
       if (i == 10)
         continue;
@@ -64,8 +68,7 @@ public class MultiReaderTest {
   
   @After
   public void tearDown() throws Exception {
-    if (fs != null)
-      fs.delete(new Path("manyMaps"), true);
+    root.create();
   }
   
   private void scan(MultiReader reader, int start) throws IOException {
@@ -92,7 +95,8 @@ public class MultiReaderTest {
   
   @Test
   public void testMultiReader() throws IOException {
-    MultiReader reader = new MultiReader(fs, conf, "manyMaps");
+    String manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps").toString();
+    MultiReader reader = new MultiReader(fs, manyMaps);
     IntWritable key = new IntWritable();
     BytesWritable value = new BytesWritable();
     
@@ -121,8 +125,8 @@ public class MultiReaderTest {
     assertEquals(0, key.get());
     reader.close();
     
-    fs.delete(new Path("manyMaps/even"), true);
-    reader = new MultiReader(fs, conf, "manyMaps");
+    fs.deleteRecursively(new Path(manyMaps, "even"));
+    reader = new MultiReader(fs, manyMaps);
     key.set(501);
     assertTrue(reader.seek(key));
     scanOdd(reader, 501);

Modified: accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
(original)
+++ accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
Fri May 24 19:59:11 2013
@@ -29,28 +29,26 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.logger.LogEvents;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.SortedLogRecovery;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.MapFile.Writer;
+import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class SortedLogRecoveryTest {
   
@@ -115,29 +113,31 @@ public class SortedLogRecoveryTest {
   }
   
   private static List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String>
files, KeyExtent extent) throws IOException {
-    final String workdir = "workdir";
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem local = FileSystem.getLocal(conf).getRaw();
-    local.delete(new Path(workdir), true);
+    TemporaryFolder root = new TemporaryFolder();
+    root.create();
+    final String workdir = "file://" + root.getRoot().getAbsolutePath() + "/workdir";
+    FileSystem fs = FileSystemImpl.getLocal();
+    fs.deleteRecursively(new Path(workdir));
     ArrayList<String> dirs = new ArrayList<String>();
     try {
       for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
         String path = workdir + "/" + entry.getKey();
-        Writer map = new MapFile.Writer(conf, local, path + "/log1", LogFileKey.class, LogFileValue.class);
+        org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+        Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class,
LogFileValue.class);
         for (KeyValue lfe : entry.getValue()) {
           map.append(lfe.key, lfe.value);
         }
         map.close();
-        local.create(new Path(path, "finished")).close();
+        ns.create(new Path(path, "finished")).close();
         dirs.add(path);
       }
       // Recover
-      SortedLogRecovery recovery = new SortedLogRecovery();
+      SortedLogRecovery recovery = new SortedLogRecovery(fs);
       CaptureMutations capture = new CaptureMutations();
       recovery.recover(extent, dirs, files, capture);
       return capture.result;
     } finally {
-      local.delete(new Path(workdir), true);
+      root.delete();
     }
   }
   

Modified: accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
(original)
+++ accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
Fri May 24 19:59:11 2013
@@ -161,7 +161,7 @@ public class ShellServerTest {
   public static void tearDownAfterClass() throws Exception {
     cluster.stop();
     traceProcess.destroy();
-    folder.delete();
+//    folder.delete();
   }
 
   @Test(timeout = 30000)
@@ -172,10 +172,10 @@ public class ShellServerTest {
     exec("addsplits row5", true);
     exec("config -t t -s table.split.threshold=345M", true);
     exec("offline t", true);
-    String export = folder.newFolder().toString();
+    String export = "file://" + folder.newFolder().toString();
     exec("exporttable -t t " + export, true);
     DistCp cp = newDistCp();
-    String import_ = folder.newFolder().toString();
+    String import_ = "file://" +folder.newFolder().toString();
     cp.run(new String[] {"-f", export + "/distcp.txt", import_});
     exec("importtable t2 " + import_, true);
     exec("config -t t2 -np", true, "345M", true);



Mime
View raw message