accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1370925 [2/2] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ core/src/main/java/org/apache/accumu...
Date Wed, 08 Aug 2012 20:02:59 GMT
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1370925&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
(added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
Wed Aug  8 20:02:58 2012
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.admin.TableOperationsImpl;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.master.state.tables.TableManager;
+import org.apache.accumulo.server.security.Authenticator;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.security.ZKAuthenticator;
+import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
+import org.apache.accumulo.server.test.FastFormat;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+class ImportedTableInfo implements Serializable {
+  
+  private static final long serialVersionUID = 1L;
+
+  public String exportDir;
+  public String user;
+  public String tableName;
+  public String tableId;
+  public String importDir;
+}
+
+class FinishImportTable extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  public FinishImportTable(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    
+    env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+
+    TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
+    
+    Utils.unreserveTable(tableInfo.tableId, tid, true);
+    
+    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+
+    env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName);
+    
+    Logger.getLogger(FinishImportTable.class).debug("Imported table " + tableInfo.tableId
+ " " + tableInfo.tableName);
+    
+    return null;
+  }
+  
+  @Override
+  public String getReturn() {
+    return tableInfo.tableId;
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {}
+  
+}
+
+class MoveExportedFiles extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  MoveExportedFiles(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    try {
+      FileSystem fs = environment.getFileSystem();
+      
+      Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs,
tableInfo);
+      
+      for (String oldFileName : fileNameMappings.keySet()) {
+        if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) {
+          throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName,
TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+              "File referenced by exported table does not exists " + oldFileName);
+        }
+      }
+      
+      FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
+      
+      for (FileStatus fileStatus : files) {
+        String newName = fileNameMappings.get(fileStatus.getPath().getName());
+        
+        if (newName != null)
+          fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName));
+      }
+      
+      return new FinishImportTable(tableInfo);
+    } catch (IOException ioe) {
+      log.warn(ioe.getMessage(), ioe);
+      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+          "Error renaming files " + ioe.getMessage());
+    }
+  }
+}
+
+class PopulateMetadataTable extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  PopulateMetadataTable(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  static Map<String,String> readMappingFile(FileSystem fs, ImportedTableInfo tableInfo)
throws Exception {
+    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir,
"mappings.txt"))));
+    
+    try {
+      Map<String,String> map = new HashMap<String,String>();
+
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        String sa[] = line.split(":", 2);
+        map.put(sa[0], sa[1]);
+      }
+      
+      return map;
+    } finally {
+      in.close();
+    }
+
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    
+    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+    
+    BatchWriter mbw = null;
+    ZipInputStream zis = null;
+
+    try {
+      FileSystem fs = environment.getFileSystem();
+      
+      mbw = HdfsZooInstance.getInstance().getConnector(SecurityConstants.getSystemCredentials())
+          .createBatchWriter(Constants.METADATA_TABLE_NAME, 1000000, 60000l, 2);
+
+      zis = new ZipInputStream(fs.open(path));
+      
+      Map<String,String> fileNameMappings = readMappingFile(fs, tableInfo);
+      
+      String bulkDir = new Path(tableInfo.importDir).getName();
+      
+      ZipEntry zipEntry;
+      while ((zipEntry = zis.getNextEntry()) != null) {
+        if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
+          DataInputStream in = new DataInputStream(new BufferedInputStream(zis));
+          
+          Key key = new Key();
+          Value val = new Value();
+          
+          Mutation m = null;
+          Text currentRow = null;
+          int dirCount = 0;
+          
+          while (true) {
+            key.readFields(in);
+            val.readFields(in);
+            
+            Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow();
+            Text metadataRow = new KeyExtent(new Text(tableInfo.tableId), endRow, null).getMetadataEntry();
+            
+            Text cq;
+            
+            if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY))
{
+              String oldName = new Path(key.getColumnQualifier().toString()).getName();
+              String newName = fileNameMappings.get(oldName);
+              
+              cq = new Text("/" + bulkDir + "/" + newName);
+            } else {
+              cq = key.getColumnQualifier();
+            }
+            
+            if (m == null) {
+              m = new Mutation(metadataRow);
+              Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++,
8, 16, "/c-".getBytes())));
+              currentRow = metadataRow;
+            }
+            
+            if (!currentRow.equals(metadataRow)) {
+              mbw.addMutation(m);
+              m = new Mutation(metadataRow);
+              Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++,
8, 16, "/c-".getBytes())));
+            }
+            
+            m.put(key.getColumnFamily(), cq, val);
+            
+            if (endRow == null && Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key))
{
+              mbw.addMutation(m);
+              break; // its the last column in the last row
+            }
+          }
+          
+          break;
+        }
+      }
+      
+      return new MoveExportedFiles(tableInfo);
+    } catch (IOException ioe) {
+      log.warn(ioe.getMessage(), ioe);
+      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+          "Error reading " + path + " " + ioe.getMessage());
+    } finally {
+      if (zis != null) {
+        try {
+          zis.close();
+        } catch (IOException ioe) {
+          log.warn("Failed to close zip file ", ioe);
+        }
+      }
+      
+      if (mbw != null) {
+        mbw.close();
+      }
+    }
+  }
+  
+  @Override
+  public void undo(long tid, Master environment) throws Exception {
+    MetadataTable.deleteTable(tableInfo.tableId, false, SecurityConstants.getSystemCredentials(),
environment.getMasterLock());
+  }
+}
+
+class MapImportFileNames extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  MapImportFileNames(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+    
+    Path path = new Path(tableInfo.importDir, "mappings.txt");
+
+    BufferedWriter mappingsWriter = null;
+
+    try {
+      FileSystem fs = environment.getFileSystem();
+      
+      fs.mkdirs(new Path(tableInfo.importDir));
+      
+      FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
+      
+      UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+      
+      mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path)));
+      
+      for (FileStatus fileStatus : files) {
+        String fileName = fileStatus.getPath().getName();
+        
+        String sa[] = fileName.split("\\.");
+        String extension = "";
+        if (sa.length > 1) {
+          extension = sa[sa.length - 1];
+          
+          if (!FileOperations.getValidExtensions().contains(extension)) {
+            continue;
+          }
+        } else {
+          // assume it is a map file
+          extension = Constants.MAPFILE_EXTENSION;
+        }
+        
+        String newName = "I" + namer.getNextName() + "." + extension;
+        
+        mappingsWriter.append(fileName);
+        mappingsWriter.append(':');
+        mappingsWriter.append(newName);
+        mappingsWriter.newLine();
+      }
+      
+      mappingsWriter.close();
+      mappingsWriter = null;
+      
+      return new PopulateMetadataTable(tableInfo);
+    } catch (IOException ioe) {
+      log.warn(ioe.getMessage(), ioe);
+      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+          "Error writing mapping file " + path + " " + ioe.getMessage());
+    } finally {
+      if (mappingsWriter != null)
+        try{
+          mappingsWriter.close();
+        }catch(IOException ioe){
+          log.warn("Failed to close "+path, ioe);
+        }
+    }
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+  }
+}
+
+class CreateImportDir extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  CreateImportDir(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    
+    Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableInfo.tableId);
+    
+    Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
+
+    tableInfo.importDir = newBulkDir.toString();
+    
+    return new MapImportFileNames(tableInfo);
+  }
+}
+
+class ImportPopulateZookeeper extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  ImportPopulateZookeeper(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
+  }
+  
+  private Map<String,String> getExportedProps(FileSystem fs) throws Exception {
+    
+    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+    
+    try {
+      return TableOperationsImpl.getExportedProps(fs, path);
+    } catch (IOException ioe) {
+      throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+          "Error reading table props from " + path + " " + ioe.getMessage());
+    }
+  }
+
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    // reserve the table name in zookeeper or fail
+    
+    Utils.tableNameLock.lock();
+    try {
+      // write tableName & tableId to zookeeper
+      Instance instance = HdfsZooInstance.getInstance();
+      
+      Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE);
+      
+      TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE);
+      
+      Tables.clearCache(instance);
+    } finally {
+      Utils.tableNameLock.unlock();
+    }
+    
+    for (Entry<String,String> entry : getExportedProps(env.getFileSystem()).entrySet())
+      if (!TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue()))
{
+        throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+            "Invalid table property " + entry.getKey());
+      }
+    
+    return new CreateImportDir(tableInfo);
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    Instance instance = HdfsZooInstance.getInstance();
+    TableManager.getInstance().removeTable(tableInfo.tableId);
+    Utils.unreserveTable(tableInfo.tableId, tid, true);
+    Tables.clearCache(instance);
+  }
+}
+
+class ImportSetupPermissions extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  public ImportSetupPermissions(ImportedTableInfo ti) {
+    this.tableInfo = ti;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return 0;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    // give all table permissions to the creator
+    Authenticator authenticator = ZKAuthenticator.getInstance();
+    for (TablePermission permission : TablePermission.values()) {
+      try {
+        authenticator.grantTablePermission(SecurityConstants.getSystemCredentials(), tableInfo.user,
tableInfo.tableId, permission);
+      } catch (AccumuloSecurityException e) {
+        Logger.getLogger(ImportSetupPermissions.class).error(e.getMessage(), e);
+        throw e.asThriftException();
+      }
+    }
+    
+    // setup permissions in zookeeper before table info in zookeeper
+    // this way concurrent users will not get a spurious permission denied
+    // error
+    return new ImportPopulateZookeeper(tableInfo);
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    ZKAuthenticator.getInstance().deleteTable(SecurityConstants.getSystemCredentials(), tableInfo.tableId);
+  }
+}
+
+public class ImportTable extends MasterRepo {
+  private static final long serialVersionUID = 1L;
+  
+  private ImportedTableInfo tableInfo;
+  
+  public ImportTable(String user, String tableName, String exportDir) {
+    tableInfo = new ImportedTableInfo();
+    tableInfo.tableName = tableName;
+    tableInfo.user = user;
+    tableInfo.exportDir = exportDir;
+  }
+  
+  @Override
+  public long isReady(long tid, Master environment) throws Exception {
+    return Utils.reserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master env) throws Exception {
+    checkVersions(env);
+
+    // first step is to reserve a table id.. if the machine fails during this step
+    // it is ok to retry... the only side effect is that a table id may not be used
+    // or skipped
+    
+    // assuming only the master process is creating tables
+    
+    Utils.idLock.lock();
+    try {
+      Instance instance = HdfsZooInstance.getInstance();
+      tableInfo.tableId = Utils.getNextTableId(tableInfo.tableName, instance);
+      return new ImportSetupPermissions(tableInfo);
+    } finally {
+      Utils.idLock.unlock();
+    }
+  }
+
+  public void checkVersions(Master env) throws ThriftTableOperationException {
+    Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
+    
+    ZipInputStream zis = null;
+
+    try {
+      zis = new ZipInputStream(env.getFileSystem().open(path));
+      
+      Integer exportVersion = null;
+      Integer dataVersion = null;
+
+      ZipEntry zipEntry;
+      while ((zipEntry = zis.getNextEntry()) != null) {
+        if (zipEntry.getName().equals(Constants.EXPORT_INFO_FILE)) {
+          BufferedReader in = new BufferedReader(new InputStreamReader(zis));
+          String line = null;
+          while ((line = in.readLine()) != null) {
+            String sa[] = line.split(":", 2);
+            if (sa[0].equals(ExportTable.EXPORT_VERSION_PROP)) {
+              exportVersion = Integer.parseInt(sa[1]);
+            } else if (sa[0].equals(ExportTable.DATA_VERSION_PROP)) {
+              dataVersion = Integer.parseInt(sa[1]);
+            }
+          }
+          
+          break;
+        }
+      }
+      
+      zis.close();
+      zis = null;
+      
+      if (exportVersion == null || exportVersion > ExportTable.VERSION)
+        throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+            "Incompatible export version " + exportVersion);
+      
+      if (dataVersion == null || dataVersion > Constants.DATA_VERSION)
+        throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+            "Incompatible data version " + exportVersion);
+
+    } catch (IOException ioe) {
+      log.warn(ioe.getMessage(), ioe);
+      throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER,
+          "Failed to read export metadata " + ioe.getMessage());
+    } finally {
+      if (zis != null)
+        try {
+          zis.close();
+        } catch (IOException ioe) {
+          log.warn(ioe.getMessage(), ioe);
+        }
+    }
+  }
+  
+  @Override
+  public void undo(long tid, Master env) throws Exception {
+    Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid);
+  }
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ExportIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ExportIndex.java?rev=1370925&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ExportIndex.java
(added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/ExportIndex.java
Wed Aug  8 20:02:58 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.test.randomwalk.shard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.test.randomwalk.State;
+import org.apache.accumulo.server.test.randomwalk.Test;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ */
+public class ExportIndex extends Test {
+  
+  @Override
+  public void visit(State state, Properties props) throws Exception {
+    
+    String indexTableName = (String) state.get("indexTableName");
+    String tmpIndexTableName = indexTableName + "_tmp";
+    
+    String exportDir = "/tmp/shard_export/" + indexTableName;
+    String copyDir = "/tmp/shard_export/" + tmpIndexTableName;
+    
+    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    
+    fs.delete(new Path("/tmp/shard_export/" + indexTableName), true);
+    fs.delete(new Path("/tmp/shard_export/" + tmpIndexTableName), true);
+    
+    // disable spits, so that splits can be compared later w/o worrying one table splitting
and the other not
+    state.getConnector().tableOperations().setProperty(indexTableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"20G");
+
+    long t1 = System.currentTimeMillis();
+    
+    state.getConnector().tableOperations().flush(indexTableName, null, null, true);
+    state.getConnector().tableOperations().offline(indexTableName);
+    
+    long t2 = System.currentTimeMillis();
+
+    state.getConnector().tableOperations().exportTable(indexTableName, exportDir);
+    
+    long t3 = System.currentTimeMillis();
+
+    // copy files
+    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(exportDir,
"distcp.txt"))));
+    String file = null;
+    while ((file = reader.readLine()) != null) {
+      Path src = new Path(file);
+      Path dest = new Path(new Path(copyDir), src.getName());
+      FileUtil.copy(fs, src, fs, dest, false, true, CachedConfiguration.getInstance());
+    }
+
+    reader.close();
+    
+    long t4 = System.currentTimeMillis();
+    
+    state.getConnector().tableOperations().online(indexTableName);
+    state.getConnector().tableOperations().importTable(tmpIndexTableName, copyDir);
+    
+    long t5 = System.currentTimeMillis();
+
+    fs.delete(new Path(exportDir), true);
+    fs.delete(new Path(copyDir), true);
+    
+    HashSet<Text> splits1 = new HashSet<Text>(state.getConnector().tableOperations().getSplits(indexTableName));
+    HashSet<Text> splits2 = new HashSet<Text>(state.getConnector().tableOperations().getSplits(tmpIndexTableName));
+    
+    if (!splits1.equals(splits2))
+      throw new Exception("Splits not equals " + indexTableName + " " + tmpIndexTableName);
+    
+    HashMap<String,String> props1 = new HashMap<String,String>();
+    for (Entry<String,String> entry : state.getConnector().tableOperations().getProperties(indexTableName))
+      props1.put(entry.getKey(), entry.getValue());
+    
+    HashMap<String,String> props2 = new HashMap<String,String>();
+    for (Entry<String,String> entry : state.getConnector().tableOperations().getProperties(tmpIndexTableName))
+      props2.put(entry.getKey(), entry.getValue());
+    
+    if (!props1.equals(props2))
+      throw new Exception("Props not equals " + indexTableName + " " + tmpIndexTableName);
+
+    // unset the split threshold
+    state.getConnector().tableOperations().removeProperty(indexTableName, Property.TABLE_SPLIT_THRESHOLD.getKey());
+    state.getConnector().tableOperations().removeProperty(tmpIndexTableName, Property.TABLE_SPLIT_THRESHOLD.getKey());
+
+    log.debug("Imported " + tmpIndexTableName + " from " + indexTableName + " flush: " +
(t2 - t1) + "ms export: " + (t3 - t2) + "ms copy:" + (t4 - t3)
+        + "ms import:" + (t5 - t4) + "ms");
+    
+  }
+  
+}

Modified: accumulo/trunk/test/system/randomwalk/conf/modules/Shard.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/randomwalk/conf/modules/Shard.xml?rev=1370925&r1=1370924&r2=1370925&view=diff
==============================================================================
--- accumulo/trunk/test/system/randomwalk/conf/modules/Shard.xml (original)
+++ accumulo/trunk/test/system/randomwalk/conf/modules/Shard.xml Wed Aug  8 20:02:58 2012
@@ -45,6 +45,10 @@
   <edge id="Verify" weight="1"/>
 </node>
 
+<node id="shard.ExportIndex">
+  <edge id="Verify" weight="1"/>
+</node>
+
 <node id="Verify" src="shard.VerifyIndex">
   <edge id="dummy.ToAll" weight="1"/>
 </node>
@@ -103,6 +107,7 @@
   <edge id="shard.Reindex" weight="3"/>
   <edge id="shard.Flush" weight="3"/>
   <edge id="shard.CloneIndex" weight="3"/>
+  <edge id="shard.ExportIndex" weight="3"/>
   <edge id="shard.Grep" weight="20"/>
   <edge id="shard.Split" weight="40"/>
   <edge id="shard.Merge" weight="20"/>
@@ -111,7 +116,7 @@
   <edge id="shard.DeleteSomeDocs" weight="20"/>
   <edge id="shard.BulkInsert" weight="3"/>
   <edge id="shard.Delete" weight="486"/>
-  <edge id="shard.Insert" weight="4693"/>
+  <edge id="shard.Insert" weight="4690"/>
   <edge id="shard.Search" weight="4691"/>
 </node>
 



Mime
View raw message