mrql-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fega...@apache.org
Subject git commit: MRQL-51: Support for Spark 1.1.0
Date Tue, 23 Sep 2014 20:26:05 GMT
Repository: incubator-mrql
Updated Branches:
  refs/heads/master 08845d6cf -> 6c6790ac0


MRQL-51: Support for Spark 1.1.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/6c6790ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/6c6790ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/6c6790ac

Branch: refs/heads/master
Commit: 6c6790ac0353d044154db5ab028a218989a6a3cb
Parents: 08845d6
Author: fegaras <fegaras@cse.uta.edu>
Authored: Tue Sep 23 15:25:29 2014 -0500
Committer: fegaras <fegaras@cse.uta.edu>
Committed: Tue Sep 23 15:25:29 2014 -0500

----------------------------------------------------------------------
 bin/mrql                                        | 13 ++--
 bin/mrql.bsp                                    | 13 ++--
 bin/mrql.flink                                  | 13 ++--
 bin/mrql.spark                                  | 28 ++-----
 bsp/src/main/java/org/apache/mrql/BSPPlan.java  |  7 +-
 conf/mrql-env.sh                                | 17 +++--
 .../main/java/org/apache/mrql/DataSource.java   |  4 +-
 .../java/org/apache/mrql/MapReduceAlgebra.java  |  6 +-
 .../test/java/org/apache/mrql/QueryTest.java    |  5 +-
 .../java/org/apache/mrql/FlinkEvaluator.gen     | 15 ++--
 .../java/org/apache/mrql/MapReducePlan.java     |  6 +-
 pom.xml                                         |  2 +-
 queries/dblp-pagerank.mrql                      |  5 +-
 .../java/org/apache/mrql/SparkEvaluator.gen     | 79 ++++++++++++++------
 14 files changed, 127 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/bin/mrql
----------------------------------------------------------------------
diff --git a/bin/mrql b/bin/mrql
index 319eb41..17c5d27 100755
--- a/bin/mrql
+++ b/bin/mrql
@@ -31,21 +31,22 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-mr-*.jar`
-FULL_JAR="$MRQL_HOME/lib/mrql-all-mr.jar"
+FULL_JAR="/tmp/${USER}_mrql_mr.jar"
+CLASS_DIR="/tmp/${USER}_mrql_classes"
 
 export JAVA_HOME MAPRED_JOB_TRACKER FS_DEFAULT_NAME
 
-if (! [ -a $FULL_JAR ]); then
-   rm -rf "$MRQL_HOME/tmp/classes"
-   mkdir -p "$MRQL_HOME/tmp/classes"
-   pushd $MRQL_HOME/tmp/classes > /dev/null
+if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
+   rm -rf $CLASS_DIR
+   mkdir -p $CLASS_DIR
+   pushd $CLASS_DIR > /dev/null
    $JAVA_HOME/bin/jar xf $CUP_JAR
    $JAVA_HOME/bin/jar xf $JLINE_JAR
    $JAVA_HOME/bin/jar xf $GEN_JAR
    $JAVA_HOME/bin/jar xf $CORE_JAR
    $JAVA_HOME/bin/jar xf $MRQL_JAR
    cd ..
-   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
    popd > /dev/null
 fi
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/bin/mrql.bsp
----------------------------------------------------------------------
diff --git a/bin/mrql.bsp b/bin/mrql.bsp
index 20a65ac..bcc23d8 100755
--- a/bin/mrql.bsp
+++ b/bin/mrql.bsp
@@ -31,21 +31,22 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-bsp-*.jar`
-FULL_JAR="$MRQL_HOME/lib/mrql-all-bsp.jar"
+FULL_JAR="/tmp/${USER}_mrql_bsp.jar"
+CLASS_DIR="/tmp/${USER}_mrql_classes"
 
 export JAVA_HOME FS_DEFAULT_NAME BSP_MASTER_ADDRESS HAMA_ZOOKEEPER_QUORUM
 
-if (! [ -a $FULL_JAR ]); then
-   rm -rf "$MRQL_HOME/tmp/classes"
-   mkdir -p "$MRQL_HOME/tmp/classes"
-   pushd $MRQL_HOME/tmp/classes > /dev/null
+if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
+   rm -rf $CLASS_DIR
+   mkdir -p $CLASS_DIR
+   pushd $CLASS_DIR > /dev/null
    $JAVA_HOME/bin/jar xf $CUP_JAR
    $JAVA_HOME/bin/jar xf $JLINE_JAR
    $JAVA_HOME/bin/jar xf $GEN_JAR
    $JAVA_HOME/bin/jar xf $CORE_JAR
    $JAVA_HOME/bin/jar xf $MRQL_JAR
    cd ..
-   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
    popd > /dev/null
 fi
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/bin/mrql.flink
----------------------------------------------------------------------
diff --git a/bin/mrql.flink b/bin/mrql.flink
index a14cf89..cd3eab4 100755
--- a/bin/mrql.flink
+++ b/bin/mrql.flink
@@ -31,21 +31,22 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-flink-*.jar`
-FULL_JAR="$MRQL_HOME/lib/mrql-all-flink.jar"
+FULL_JAR="/tmp/${USER}_mrql_flink.jar"
+CLASS_DIR="/tmp/${USER}_mrql_classes"
 
 export FLINK_HOME FLINK_JARS FLINK_MASTER
 
-if (! [ -a $FULL_JAR ]); then
-   rm -rf "$MRQL_HOME/tmp/classes"
-   mkdir -p "$MRQL_HOME/tmp/classes"
-   pushd $MRQL_HOME/tmp/classes > /dev/null
+if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
+   rm -rf $CLASS_DIR
+   mkdir -p $CLASS_DIR
+   pushd $CLASS_DIR > /dev/null
    $JAVA_HOME/bin/jar xf $CUP_JAR
    $JAVA_HOME/bin/jar xf $JLINE_JAR
    $JAVA_HOME/bin/jar xf $GEN_JAR
    $JAVA_HOME/bin/jar xf $CORE_JAR
    $JAVA_HOME/bin/jar xf $MRQL_JAR
    cd ..
-   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
    popd > /dev/null
 fi
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/bin/mrql.spark
----------------------------------------------------------------------
diff --git a/bin/mrql.spark b/bin/mrql.spark
index 60151f0..08e93b4 100755
--- a/bin/mrql.spark
+++ b/bin/mrql.spark
@@ -28,32 +28,20 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 
 . "$MRQL_HOME/conf/mrql-env.sh"
 
-if [[ !(-f ${SPARK_JAR}) ]]; then
+if [[ !(-f ${SPARK_JARS}) ]]; then
    echo "*** Cannot find the Spark jar file. Need to edit mrql-env.sh"; exit -1
 fi
 
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-spark-*.jar`
-FULL_JAR="$MRQL_HOME/lib/mrql-all-spark.jar"
 
-export SPARK_HOME SPARK_JAR FS_DEFAULT_NAME SPARK_MASTER SPARK_WORKER_INSTANCES SPARK_WORKER_CORES
SPARK_WORKER_MEMORY SPARK_MASTER_MEMORY
-export SPARK_YARN_APP_JAR=$FULL_JAR
+export SPARK_HOME FS_DEFAULT_NAME SPARK_MASTER SPARK_WORKER_INSTANCES SPARK_WORKER_CORES
SPARK_WORKER_MEMORY SPARK_MASTER_MEMORY
 
-if (! [ -a $FULL_JAR ]); then
-   rm -rf "$MRQL_HOME/tmp/classes"
-   mkdir -p "$MRQL_HOME/tmp/classes"
-   pushd $MRQL_HOME/tmp/classes > /dev/null
-   $JAVA_HOME/bin/jar xf $CUP_JAR
-   $JAVA_HOME/bin/jar xf $JLINE_JAR
-   $JAVA_HOME/bin/jar xf $GEN_JAR
-   $JAVA_HOME/bin/jar xf $CORE_JAR
-   $JAVA_HOME/bin/jar xf $MRQL_JAR
-   cd ..
-   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
-   popd > /dev/null
-fi
-
-SPARK_CLASSPATH="$CUP_JAR:$JLINE_JAR:$GEN_JAR:$CORE_JAR:$MRQL_JAR:$SPARK_JAR:$HADOOP_JARS"
+SPARK_CLASSPATH="$CUP_JAR:$JLINE_JAR:$GEN_JAR:$CORE_JAR:$MRQL_JAR:$SPARK_JARS:$HADOOP_JARS"
 
-$JAVA_HOME/bin/java -classpath $SPARK_CLASSPATH org.apache.mrql.Main -spark $*
+if [ "$1" == "-local" ] || [ "$1" == "-dist" ]; then
+   $SPARK_HOME/bin/spark-submit --class org.apache.mrql.Main --jars $CUP_JAR,$JLINE_JAR,$GEN_JAR,$CORE_JAR
$MRQL_JAR -spark $*
+else
+   $JAVA_HOME/bin/java -classpath $SPARK_CLASSPATH org.apache.mrql.Main -spark $*
+fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/bsp/src/main/java/org/apache/mrql/BSPPlan.java
----------------------------------------------------------------------
diff --git a/bsp/src/main/java/org/apache/mrql/BSPPlan.java b/bsp/src/main/java/org/apache/mrql/BSPPlan.java
index 942d69d..8f118c2 100644
--- a/bsp/src/main/java/org/apache/mrql/BSPPlan.java
+++ b/bsp/src/main/java/org/apache/mrql/BSPPlan.java
@@ -412,9 +412,10 @@ final public class BSPPlan extends Plan {
                 split_size = (long)Math.ceil((double)split_size*1.01);
         } while (tasks > Config.nodes);
         job.setNumBspTask(tasks);
-        System.err.println("Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
-                           +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
-                           +" bytes of input data.");
+        if (Config.trace)
+            System.err.println("Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
+                               +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
+                               +" bytes of input data.");
         job.set("bsp.min.split.size",Long.toString(split_size));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index fb467e0..fb29c94 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -64,11 +64,11 @@ BSP_MASTER_ADDRESS=localhost:40000
 HAMA_ZOOKEEPER_QUORUM=localhost
 
 
-# Optional: Spark configuration. Supports versions 1.0.0 and 1.0.2 only
+# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, and 1.1.0
 # (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
-# Use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
+# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
 # Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.0.2-bin-hadoop2
+SPARK_HOME=${HOME}/spark-1.1.0-bin-hadoop2.3
 # URI of the Spark master node:
 #   to run Spark on Standalone Mode, set it to spark://`hostname`:7077
 #   to run Spark on a YARN cluster, set it to "yarn-client"
@@ -108,11 +108,9 @@ done
 
 # YARN-enabled assembly jar
 if [[ -d ${SPARK_HOME}/assembly/target ]]; then
-   # old Spark (0.x)
-   SPARK_JAR=`ls ${SPARK_HOME}/assembly/target/scala-*/*.jar`
+   SPARK_JARS=`ls ${SPARK_HOME}/assembly/target/scala-*/*.jar`
 else if [[ -d ${SPARK_HOME}/lib ]]; then
-   # new Spark (1.x)
-   SPARK_JAR=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
+   SPARK_JARS=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
 fi
 fi
 
@@ -134,3 +132,8 @@ fi
 if [[ !(-f ${JLINE_JAR}) ]]; then
    echo "*** Cannot find the JLine jar file. Need to edit mrql-env.sh"; exit -1
 fi
+
+export MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
+if [[ !(-f `echo ${MRQL_HOME}/lib/mrql-core-*.jar`) ]]; then
+   echo "*** Need to compile MRQL first using 'mvn clean install'"; exit -1
+fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/core/src/main/java/org/apache/mrql/DataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSource.java b/core/src/main/java/org/apache/mrql/DataSource.java
index cc156ce..04d2bbf 100644
--- a/core/src/main/java/org/apache/mrql/DataSource.java
+++ b/core/src/main/java/org/apache/mrql/DataSource.java
@@ -166,9 +166,9 @@ public class DataSource {
         int count = num;
         try {
             ArrayList<MRData> res = new ArrayList<MRData>();
-            Iterator<MRData> it = inputFormat.newInstance().materialize(new Path(path)).iterator();
+            Iterator<MRData> it = inputFormat.newInstance().collect(new DataSet(this,0,0),false).iterator();
             for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
-                if (Config.hadoop_mode && Config.bsp_mode)
+                if (Config.hadoop_mode && Config.bsp_mode && !to_be_merged)
                     res.add(((Tuple)it.next()).get(1));  // strip tag in BSP mode
                 else res.add(it.next());
             return res;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/core/src/main/java/org/apache/mrql/MapReduceAlgebra.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/MapReduceAlgebra.java b/core/src/main/java/org/apache/mrql/MapReduceAlgebra.java
index d8935e1..dd5f5bb 100644
--- a/core/src/main/java/org/apache/mrql/MapReduceAlgebra.java
+++ b/core/src/main/java/org/apache/mrql/MapReduceAlgebra.java
@@ -583,7 +583,8 @@ final public class MapReduceAlgebra {
                     };
                     s.add(t.first());
                 };
-                System.err.println("*** Repeat #"+i+": "+c+" true results");
+                if (!Config.testing)
+                    System.err.println("*** Repeat #"+i+": "+c+" true results");
             } else throw new Error("Wrong repeat");
         } while (cont && i < max_num);
         return s;
@@ -614,7 +615,8 @@ final public class MapReduceAlgebra {
                 n = s.size();
             } else if (d instanceof MR_dataset) {
                 DataSet ds = ((MR_dataset)d).dataset();
-                System.err.println("*** Repeat #"+i+": "+(ds.records-n)+" new records");
+                if (!Config.testing)
+                    System.err.println("*** Repeat #"+i+": "+(ds.records-n)+" new records");
                 old = n;
                 n = ds.records;
                 s = Plan.collect(ds);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/core/src/test/java/org/apache/mrql/QueryTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/mrql/QueryTest.java b/core/src/test/java/org/apache/mrql/QueryTest.java
index a675c9d..3186d89 100644
--- a/core/src/test/java/org/apache/mrql/QueryTest.java
+++ b/core/src/test/java/org/apache/mrql/QueryTest.java
@@ -53,8 +53,8 @@ public abstract class QueryTest extends TestCase {
 			evaluator = createEvaluator();
 		Translator.global_reset();
                 for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements();
)
-                    ((Logger)en.nextElement()).setLevel(Level.WARN);
-                LogManager.getRootLogger().setLevel(Level.WARN);
+                    ((Logger)en.nextElement()).setLevel(Level.ERROR);
+                LogManager.getRootLogger().setLevel(Level.ERROR);
 	}
 
 	public void tearDown() throws IOException {
@@ -155,6 +155,7 @@ public abstract class QueryTest extends TestCase {
 	}
 
     private int queryAndCompare ( File query, File resultDir ) throws Exception {
+        System.err.println("Testing "+query);
         Translator.global_reset();
         String qname = query.getName();
         qname = qname.substring(0,qname.length()-5);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index dbd79b6..e5c92a6 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -69,7 +69,7 @@ final public class FlinkEvaluator extends Evaluator {
             // curently, the compiler doesn't work in local mode
             Config.compile_functional_arguments = false;
             Config.write(Plan.conf);
-        } else {
+        } else if (Config.distributed_mode) {
             String master_node = System.getenv("FLINK_MASTER");
             if (master_node == null)
                 throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh");
@@ -94,7 +94,8 @@ final public class FlinkEvaluator extends Evaluator {
                                         Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
                 else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
                                         Config.nodes,flink_jar.toURI().getPath());
-            } else flink_env.setDefaultLocalParallelism(Config.nodes);
+            } else if (Config.distributed_mode)
+                flink_env.setDefaultLocalParallelism(Config.nodes);
         } catch (Exception ex) {
             throw new Error("Cannot initialize the Flink evaluator: "+ex);
         }
@@ -149,9 +150,11 @@ final public class FlinkEvaluator extends Evaluator {
             ftp.close();
             if (s.length != 2 )
                 return null;
-            if (!s[0].equals("2"))
-                throw new Error("The binary file has been created in java mode and cannot
be read in hadoop mode");
-            return Tree.parse(s[1]);
+            if (s[0].equals("2"))
+                throw new Error("This is a Hadoop Sequence and cannot be read as a Flink
binary file: "+file);
+            else if (!s[0].equals("3"))
+                throw new Error("The binary file has been created in java mode and cannot
be read in hadoop mode: "+file);
+            else return Tree.parse(s[1]);
         } catch (Exception e) {
             return null;
         }
@@ -163,7 +166,7 @@ final public class FlinkEvaluator extends Evaluator {
         Path path = new Path(absolute_path(file));
         FileSystem fs = path.getFileSystem();
         PrintStream ftp = new PrintStream(fs.create(path.suffix(".type"),true));
-        ftp.print("2@"+type.toString()+"\n");
+        ftp.print("3@"+type.toString()+"\n");
         ftp.close();
         if (data instanceof MR_dataset)
             data = Plan.collect(((MR_dataset)data).dataset());

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/mapreduce/src/main/java/org/apache/mrql/MapReducePlan.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/mrql/MapReducePlan.java b/mapreduce/src/main/java/org/apache/mrql/MapReducePlan.java
index 75f0568..5aa557b 100644
--- a/mapreduce/src/main/java/org/apache/mrql/MapReducePlan.java
+++ b/mapreduce/src/main/java/org/apache/mrql/MapReducePlan.java
@@ -96,7 +96,8 @@ public class MapReducePlan extends Plan {
         do {
             s.dataset = ((MR_dataset)loop.eval(s)).dataset;
             i++;
-            System.err.println("Repeat #"+i+": "+s.dataset.counter+" true results");
+            if (!Config.testing)
+                System.err.println("Repeat #"+i+": "+s.dataset.counter+" true results");
         } while (s.dataset.counter != 0 && i < max_num);
         return s.dataset;
     }
@@ -116,7 +117,8 @@ public class MapReducePlan extends Plan {
         do {
             s.dataset = ((MR_dataset)loop.eval(s)).dataset;
             i++;
-            System.err.println("Repeat #"+i+": "+(s.dataset.records-n)+" new records");
+            if (!Config.testing)
+                System.err.println("Repeat #"+i+": "+(s.dataset.records-n)+" new records");
             old = n;
             n = s.dataset.records;
         } while (old < n && i < max_num);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b2c73b..a8fdadc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
     <hadoop.version>1.2.1</hadoop.version>
     <yarn.version>2.2.0</yarn.version>
     <hama.version>0.6.3</hama.version>
-    <spark.version>1.0.2</spark.version>
+    <spark.version>1.1.0</spark.version>
     <scala.version>2.10</scala.version>
     <flink.version>0.6-incubating</flink.version>
     <skipTests>true</skipTests>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/queries/dblp-pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/dblp-pagerank.mrql b/queries/dblp-pagerank.mrql
index cd6ae85..810b5cc 100644
--- a/queries/dblp-pagerank.mrql
+++ b/queries/dblp-pagerank.mrql
@@ -16,7 +16,10 @@
  * limitations under the License.
  */
 
-DBLP = source(xml,args[0],{"article","incollection","book","inproceedings"});
+// PageRank on the DBLP bibliography.
+// You can download dblp.xml.gz from http://dblp.uni-trier.de/xml/
+
+DBLP = source(xml,args[0],{"article","inproceedings"});
 
 graph = select (key,select text(x) from x in c)
           from a in DBLP,

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/6c6790ac/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index ae33b33..ad9ed2f 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import scala.Tuple2;
+import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import org.apache.spark.Partition;
 import org.apache.spark.Accumulator;
@@ -63,16 +64,20 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
     /** initialize the Spark evaluator */
     final public void init ( Configuration conf ) {
         Config.spark_mode = true;
-        if (Config.hadoop_mode && Config.local_mode)
-            spark_context = new JavaSparkContext("local["+Config.nodes+"]",
-                                                 "Apache MRQL Local Spark Evaluator");
-        else if (Config.hadoop_mode) {
-            HashMap<String,String> env = new HashMap<String,String>();
-            spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
-                                                 "Apache MRQL Spark Evaluator",
-                                                 System.getenv("SPARK_HOME"),
-                                                 new String[]{spark_jar,core_jar,gen_jar,scanner_jar},
-                                                 env);
+        SparkConf spark_conf = new SparkConf();
+        spark_conf.setAppName("MRQL");
+        if (Config.hadoop_mode && Config.local_mode) {
+            spark_conf.setMaster("local["+Config.nodes+"]");
+            spark_context = new JavaSparkContext(spark_conf);
+            Plan.conf = spark_context.hadoopConfiguration();
+            FileSystem.setDefaultUri(Plan.conf,"file:///");
+        } else if (Config.hadoop_mode) {
+            spark_conf.setMaster(System.getenv("SPARK_MASTER"));
+            spark_conf.setSparkHome(System.getenv("SPARK_HOME"));
+            spark_conf.setJars(new String[]{spark_jar,core_jar,gen_jar,scanner_jar});
+            spark_conf.set("spark.logConf","false");
+            spark_conf.set("spark.eventLog.enabled","false");
+            spark_context = new JavaSparkContext(spark_conf);
             Plan.conf = spark_context.hadoopConfiguration();
             FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
         };
@@ -85,9 +90,11 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
 
     /** shutdown the Spark evaluator */
     final public void shutdown ( Configuration conf ) {
-        spark_context.stop();
-        spark_context = null;
-        System.clearProperty("spark.driver.port");
+        if (Config.local_mode) {
+            spark_context.stop();
+            spark_context = null;
+            System.clearProperty("spark.driver.port");
+        }
     }
 
     final public void initialize_query () {
@@ -290,17 +297,43 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
             });
     }
 
-    // changed since Spark 0.8.1
-    final static TaskContext context = new TaskContext(0,0,(long)0,Config.local_mode,null);
+    // Return the RDD elements at the given position
+    private static Iterator<MRData> partition ( final JavaRDD<MRData> rdd, final
int position ) {
+        /* Doesn't work (needs the right TaskContext)
+        TaskContext context = new TaskContext(0,0,(long)0,Config.local_mode,null);
+        List<Partition> ps = rdd.splits();
+        return rdd.iterator(ps.get(position),context);
+        */
+        return rdd.mapPartitionsWithIndex(new Function2<Integer,Iterator<MRData>,Iterator<MRData>>()
{
+                public Iterator<MRData> call ( Integer partition, Iterator<MRData>
values ) {
+                    if (partition == position)
+                        return values;
+                    else return new ArrayList<MRData>().iterator();
+                }
+            },true).collect().iterator();
+    }
+
+    final static int MAX_CACHE_SIZE = 1000;
 
     /** Convert a Spark RDD into a lazy bag
      * @param rdd the Spark RDD
      * @return a lazy bag that contains all RDD elements
      */
-    public static Bag bag ( final JavaRDD<MRData> rdd ) {
+    public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException {
         final JavaRDD<MRData> rd = rdd.cache();
-        rd.count();  // force the evaluation of all pending Spark operations
-        final List<Partition> ps = rd.splits();
+        if (rd.count() <= MAX_CACHE_SIZE) {  // small RDD
+            final Iterator<MRData> i = rd.collect().iterator();
+            return new Bag(new BagIterator() {
+                    public MRData next () {
+                        return i.next();
+                    }
+                    public boolean hasNext () {
+                        return i.hasNext();
+                    }
+                });
+        };
+        // return the RDD elements lazily, one partition at a time
+        final int splits = rd.splits().size();
         return new Bag(new BagIterator() {
                 Iterator<MRData> i = null;
                 int c = 0;
@@ -311,9 +344,9 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
                     do {
                         if (i != null && i.hasNext()) 
                             return true;
-                        if (c >= ps.size())
+                        if (c >= splits)
                             return false;
-                        i = rdd.iterator(ps.get(c++),context);
+                        i = partition(rd,c++);
                     } while (true);
                 }
             });
@@ -711,7 +744,8 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
                                        });
                     i++;
                     cont = true_results > 0 && i < max_num;
-                    System.err.println("Repeat #"+i+": "+true_results+" true results");
+                    if (!Config.testing)
+                        System.err.println("Repeat #"+i+": "+true_results+" true results");
                 } while (cont);
                 return res;
             case Closure(lambda(`v,`b),`s,`m):
@@ -726,7 +760,8 @@ final public class SparkEvaluator extends Evaluator implements Serializable
{
                     old = n;
                     n = res.count();
                     i++;
-                    System.err.println("Repeat #"+i+": "+(old-n)+" new records");
+                    if (!Config.testing)
+                        System.err.println("Repeat #"+i+": "+(old-n)+" new records");
                 } while (old < n && i < max_num);
                 return res;
             case Generator(`min,`max,`size):


Mime
View raw message