pirk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIRK-21) Apache Spark Streaming Implementation
Date Tue, 23 Aug 2016 14:37:20 GMT

    [ https://issues.apache.org/jira/browse/PIRK-21?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15432915#comment-15432915
] 

ASF GitHub Bot commented on PIRK-21:
------------------------------------

Github user ellisonanne commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/76#discussion_r75879106
  
    --- Diff: src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
---
    @@ -0,0 +1,467 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.responder.wideskies.spark.streaming;
    +
    +import java.math.BigInteger;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Queue;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.io.MapWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
    +import org.apache.pirk.inputformat.hadoop.InputFormatConst;
    +import org.apache.pirk.query.wideskies.Query;
    +import org.apache.pirk.query.wideskies.QueryInfo;
    +import org.apache.pirk.responder.wideskies.spark.Accumulators;
    +import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
    +import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper;
    +import org.apache.pirk.responder.wideskies.spark.EncColMultReducer;
    +import org.apache.pirk.responder.wideskies.spark.EncRowCalc;
    +import org.apache.pirk.responder.wideskies.spark.FilterData;
    +import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
    +import org.apache.pirk.schema.data.DataSchema;
    +import org.apache.pirk.schema.data.DataSchemaLoader;
    +import org.apache.pirk.schema.data.DataSchemaRegistry;
    +import org.apache.pirk.schema.query.QuerySchema;
    +import org.apache.pirk.schema.query.QuerySchemaLoader;
    +import org.apache.pirk.schema.query.QuerySchemaRegistry;
    +import org.apache.pirk.serialization.HadoopFileSystemStore;
    +import org.apache.pirk.utils.PIRException;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.apache.spark.SparkConf;
    +import org.apache.spark.api.java.JavaPairRDD;
    +import org.apache.spark.api.java.JavaRDD;
    +import org.apache.spark.api.java.JavaSparkContext;
    +import org.apache.spark.api.java.function.Function;
    +import org.apache.spark.api.java.function.VoidFunction;
    +import org.apache.spark.streaming.Durations;
    +import org.apache.spark.streaming.api.java.JavaDStream;
    +import org.apache.spark.streaming.api.java.JavaPairDStream;
    +import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    +import org.apache.spark.streaming.api.java.JavaStreamingContext;
    +import org.elasticsearch.hadoop.mr.EsInputFormat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Master class for the PIR query spark streaming application
    + * <p>
    + * NOTE:
    + * <p>
    + * - NOT using Elasticsearch in practice - proved to be some speed issues with ES and
Spark that appear to be ES-Spark specific - leave code in anticipation
    + * that the ES-Spark issues resolve...
    + * <p>
    + * - Even if rdd.count() calls are embedded in logger.debug statements, they are computed
by Spark. Thus, they are commented out in the code below - uncomment
    + * for rdd.count() debug
    + * 
    + */
    +public class ComputeStreamingResponse
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(ComputeStreamingResponse.class);
    +
    +  private String dataInputFormat = null;
    +  private String inputData = null;
    +  private String outputFile = null;
    +  private String outputDirExp = null;
    +
    +  private String queryInput = null;
    +  QuerySchema qSchema = null;
    +
    +  private String esQuery = "none";
    +  private String esResource = "none";
    +
    +  private FileSystem fs = null;
    +  private HadoopFileSystemStore storage = null;
    +  private JavaStreamingContext jssc = null;
    +
    +  boolean useQueueStream = false;
    +
    +  private long batchSeconds = 0;
    +  private long windowLength = 0;
    +
    +  private Accumulators accum = null;
    +  private BroadcastVars bVars = null;
    +
    +  private QueryInfo queryInfo = null;
    +  Query query = null;
    +
    +  private int numDataPartitions = 0;
    +  private int numColMultPartitions = 0;
    +
    +  private boolean colMultReduceByKey = false;
    +
    +  public ComputeStreamingResponse(FileSystem fileSys) throws Exception
    +  {
    +    fs = fileSys;
    +    storage = new HadoopFileSystemStore(fs);
    +
    +    dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
    +    if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
    +    {
    +      throw new IllegalArgumentException("inputFormat = " + dataInputFormat + " is of
an unknown form");
    +    }
    +    logger.info("inputFormat = " + dataInputFormat);
    +    if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
    +    {
    +      inputData = SystemConfiguration.getProperty("pir.inputData", "none");
    +      if (inputData.equals("none"))
    +      {
    +        throw new IllegalArgumentException("For inputFormat = " + dataInputFormat + "
an inputFile must be specified");
    +      }
    +      logger.info("inputFile = " + inputData);
    +    }
    +    else if (dataInputFormat.equals(InputFormatConst.ES))
    +    {
    +      esQuery = SystemConfiguration.getProperty("pir.esQuery", "none");
    +      esResource = SystemConfiguration.getProperty("pir.esResource", "none");
    +      if (esQuery.equals("none"))
    +      {
    +        throw new IllegalArgumentException("esQuery must be specified");
    +      }
    +      if (esResource.equals("none"))
    +      {
    +        throw new IllegalArgumentException("esResource must be specified");
    +      }
    +      logger.info("esQuery = " + esQuery + " esResource = " + esResource);
    +    }
    +    outputFile = SystemConfiguration.getProperty("pir.outputFile");
    +    outputDirExp = outputFile + "_exp";
    +
    +    queryInput = SystemConfiguration.getProperty("pir.queryInput");
    +    String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
    +
    +    logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile
= " + stopListFile + " esQuery = " + esQuery
    +        + " esResource = " + esResource);
    +
    +    // Pull the batchSeconds and windowLength parameters
    +    batchSeconds = Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds",
"30"));
    --- End diff --
    
    'batchSeconds' is an input to the Durations class' static seconds method which requires
a long (http://spark.apache.org/docs/latest/api/java/) 
    
    But, yes, I do think that a long is a bit of overkill ;)


> Apache Spark Streaming Implementation
> -------------------------------------
>
>                 Key: PIRK-21
>                 URL: https://issues.apache.org/jira/browse/PIRK-21
>             Project: PIRK
>          Issue Type: Improvement
>          Components: Responder
>            Reporter: Ellison Anne Williams
>            Assignee: Ellison Anne Williams
>
> Provide a Spark streaming implementation for Pirk. 
> Although there is discussion and a forthcoming JIRA issue for Pirk integration with Apache
Beam, we can, at the very least, use this implementation to benchmark straight Spark Streaming
vs Beam + Spark Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message