pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigAbstractionFrontEnd" by AntonioMagnaghi
Date Tue, 20 Nov 2007 23:04:55 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by AntonioMagnaghi:
http://wiki.apache.org/pig/PigAbstractionFrontEnd

New page:
= Data-Storage Based Pig Front-End =

This is a sample code fragment where `PigContext.java` has been adapted to use the Data Storage
API defined above.

{{{
-
-    //  configuration for connecting to hadoop
-    transient private JobConf conf = null;        
     
@@ -79,16 +81,21 @@
+    // configuration information for file system(s)
+    transient private DataStorageProperties fileSystemConf;
+    
     //main file system that jobs and shell commands access
-    transient private FileSystem dfs;                         
+    transient private DataStorage dfs;     
}}}

{{{
@@ -195,21 +199,32 @@
                                }
                        }
                
-                    
-                   lfs = FileSystem.getNamed("local", conf);
-              
-                   mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
-                   dfs = FileSystem.get(conf);
+                   HadoopDataStorageConfiguration conf = 
                           new HadoopDataStorageConfiguration();             
+                   fileSystemConf = conf;
+                       
+                   lfs = new HadoopFileSystem(new URI(...), conf);
+                   
+                   mLogger.info("Connecting to hadoop file system at: " + 
                                 fileSystemConf.getValue("fs.default.name"));
+                   
+                   dfs = new HadoopFileSystem (new URI(...), conf);
+                       
+                   mLogger.info("Connecting to map-reduce job tracker at: " + 
                                  conf.getValue("mapred.job.tracker"));
+
-                   mLogger.info("Connecting to map-reduce job tracker at: " + 
                                  conf.get("mapred.job.tracker"));
-                   jobTracker = (JobSubmissionProtocol) 
                                  RPC.getProxy(JobSubmissionProtocol.class,
-                                              JobSubmissionProtocol.versionID, 
                                               JobTracker.getAddress(conf), 
                                               conf);
-                   jobClient = new JobClient(conf);
+                   HadoopExecutionEngineConfiguration execConf = 
                             new HadoopExecutionEngineConfiguration();
+                   backEndConf = execConf;
+                   jobClient = new HadoopExecutionEngine(execConf);
                
           }else{
-               conf = new JobConf();
-               lfs = FileSystem.getNamed("local", conf);
-               dfs = lfs;  // for local execution, the "dfs" is the local file system
+               HadoopDataStorageConfiguration conf = new HadoopDataStorageConfiguration();
            
+               fileSystemConf = conf;
+               
+               lfs = new HadoopFileSystem(new URI(...), 
                                           new HadoopDataStorageConfiguration(conf));
+               
+               dfs = lfs;  // for local execution, the "dfs" is the local file system
           }
        }catch (IOException e){
}}}

These are sample code fragments from `PigServer.java`. Operations that previously utilized
the Hadoop file system directly, now have been adapted to use the Data Storage API defined
above.

{{{
@@ -485,37 +565,99 @@
      * @return
      * @throws IOException
      */
-    public long fileSize(String filename) throws IOException {
-        FileSystem dfs = pigContext.getDfs();
-        Path p = new Path(filename);
-        long len = dfs.getLength(p);
-        long replication = dfs.getDefaultReplication(); 
-        return len * replication;
+    public long fileSize(String name) throws IOException {
+       try {
+               DataStorage dfs = pigContext.getDfs();
+               DataStorageElementDescriptor elem = dfs.asElement(name);
+               DataStorageProperties elemStats = elem.getStatistics();
+               
+               long len = new Long(elemStats.
                                    getValue("length.bytes").toString());
+               long replication = new Long(elemStats.
                                       getValue("replication").toString());
+               
+               return len * replication;
+       }
+       catch (DataStorageException e)
+       {
+               return 0;
+       }
     }
}}}

{{{
-    public boolean deleteFile(String filename) throws IOException {
-        return pigContext.getDfs().delete(new Path(filename));
+    public boolean deleteFile(String name) throws IOException {
+       try {
+               DataStorage ds = pigContext.getDfs();
+               DataStorageElementDescriptor elem = ds.asElement(name);
+               
+               return elem.delete();
+           }
+       catch (DataStorageException e)
+       {
+               throw new IOException(e);
+       }
     }
}}}

Mime
View raw message