incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/6] git commit: Made some change to the getSplits method. Use a BlurClient instead of the BlurControllerServer. Also added some stubs to get the connection string, cluster, and table.
Date Fri, 09 Nov 2012 03:17:41 GMT
Made some change to the getSplits method.  Use a BlurClient instead of the BlurControllerServer.
 Also added some stubs to get the connection string, cluster, and table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b0893e13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b0893e13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b0893e13

Branch: refs/heads/0.2-dev
Commit: b0893e13f4cab8812de49e2c217bdb4362b8f513
Parents: bed1530
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Nov 3 22:14:32 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Nov 3 22:14:32 2012 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/mr/BlurInputFormat.java   |   58 ++++++++-------
 1 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0893e13/src/blur-mapred/src/main/java/org/apache/blur/mr/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mr/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mr/BlurInputFormat.java
index 7dc0190..bab693c 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mr/BlurInputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mr/BlurInputFormat.java
@@ -26,7 +26,9 @@ import java.util.Map;
 import org.apache.blur.mapreduce.BlurMutate;
 import org.apache.blur.mapreduce.BlurRecord;
 import org.apache.blur.mapreduce.BlurReducer;
+import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.BlurControllerServer;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.QuerySession;
 import org.apache.hadoop.conf.Configuration;
@@ -40,32 +42,35 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 
-
-public class BlurInputFormat extends InputFormat<Text,BlurRecord>{
-  private static Map<String,QuerySession> querySessionStore = new HashMap<String,QuerySession>();
+public class BlurInputFormat extends InputFormat<Text, BlurRecord> {
+  
+  private static Map<String, QuerySession> querySessionStore = new HashMap<String,
QuerySession>();
 
   @Override
-  public RecordReader<Text, BlurRecord> createRecordReader(InputSplit arg0,
-      TaskAttemptContext arg1) throws IOException, InterruptedException {
+  public RecordReader<Text, BlurRecord> createRecordReader(InputSplit inputSplit, TaskAttemptContext
context) throws IOException, InterruptedException {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException,
-      InterruptedException {
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
+    Configuration configuration = context.getConfiguration();
+    
+    String connectionStr = configuration.get("blur.controller.connections");
+    String cluster = configuration.get("blur.cluster.name");
+    String table = configuration.get("blur.table.name");
+    
     QuerySession querySession = getReadQuerySession(context);
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    BlurControllerServer controllerServer = new BlurControllerServer();
-    // TODO Replace with API call to get Cluster name or implement in another way
-    String cluster = "test";
+    Iface client = BlurClient.getClient(connectionStr);
+    
     try {
-      List<String> shardServerNames = controllerServer.shardServerList(cluster);
-      if (shardServerNames == null || shardServerNames.isEmpty()){
+      List<String> shardServerNames = client.shardServerList(cluster);
+      if (shardServerNames == null || shardServerNames.isEmpty()) {
         throw new RuntimeException("Query not executing against any server");
       }
-      for (String shardServerName : shardServerNames){
-        splits.add(new BlurInputSplit(shardServerName,new BlurQuerySession(querySession),0,Integer.MAX_VALUE));
+      for (String shardServerName : shardServerNames) {
+        splits.add(new BlurInputSplit(shardServerName, new BlurQuerySession(querySession),
0, Integer.MAX_VALUE));
       }
       return splits;
     } catch (BlurException e) {
@@ -74,37 +79,34 @@ public class BlurInputFormat extends InputFormat<Text,BlurRecord>{
       throw new RuntimeException(e.getMessage());
     }
   }
-  
-  BlurControllerServer getBlurServerInstance() {
-    return new BlurControllerServer();
-  }
 
-  QuerySession getReadQuerySession(JobContext context){
+  QuerySession getReadQuerySession(JobContext context) {
     String queryId = context.getConfiguration().get("blur.query.id");
     QuerySession querySession = querySessionStore.get(queryId);
-    if(querySession == null){
+    if (querySession == null) {
       throw new RuntimeException("Query Object is Invalid or Null");
     }
     return querySession;
   }
-  public static Job configureJob(Configuration conf, QuerySession querySession) throws IOException{
-    if(querySession == null){
+
+  public static Job configureJob(Configuration conf, QuerySession querySession) throws IOException
{
+    if (querySession == null) {
       throw new IllegalArgumentException("Invalid read Session");
     }
     querySessionStore.put(querySession.getQueryId(), querySession);
-    conf.set("blur.query.id",querySession.getQueryId());
-    Job job = new Job(conf, "Blur Index Reader For query id : "+querySession.getQueryId());
+    conf.set("blur.query.id", querySession.getQueryId());
+    Job job = new Job(conf, "Blur Index Reader For query id : " + querySession.getQueryId());
     job.setReducerClass(BlurReducer.class);
     job.setOutputKeyClass(BytesWritable.class);
     job.setOutputValueClass(BlurMutate.class);
     return job;
   }
-  
-  public QuerySession getQuerySessionById(String querySessionId){
+
+  public QuerySession getQuerySessionById(String querySessionId) {
     return querySessionStore.get(querySessionId);
   }
-  
-  public QuerySession removeQuerySessionFromSessionStoreById(String querySessionId){
+
+  public QuerySession removeQuerySessionFromSessionStoreById(String querySessionId) {
     return querySessionStore.remove(querySessionId);
   }
 }


Mime
View raw message